summaryrefslogtreecommitdiff
path: root/src/mongo/db/serverless/shard_split_donor_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/serverless/shard_split_donor_service.cpp')
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp137
1 files changed, 58 insertions, 79 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index d240f9c5496..defb74e9a3c 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -116,15 +116,8 @@ void insertTenantAccessBlocker(WithLock lk,
ShardSplitDonorDocument donorStateDoc) {
auto optionalTenants = donorStateDoc.getTenantIds();
invariant(optionalTenants);
-
- auto recipientTagName = donorStateDoc.getRecipientTagName();
- auto recipientSetName = donorStateDoc.getRecipientSetName();
- invariant(recipientTagName);
- invariant(recipientSetName);
-
- auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
- auto recipientConnectionString =
- serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName);
+ auto recipientConnectionString = donorStateDoc.getRecipientConnectionString();
+ invariant(recipientConnectionString);
for (const auto& tenantId : optionalTenants.get()) {
auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>(
@@ -132,7 +125,7 @@ void insertTenantAccessBlocker(WithLock lk,
donorStateDoc.getId(),
tenantId.toString(),
MigrationProtocolEnum::kMultitenantMigrations,
- recipientConnectionString.toString());
+ recipientConnectionString->toString());
TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab);
@@ -154,13 +147,7 @@ namespace detail {
SemiFuture<void> makeRecipientAcceptSplitFuture(
std::shared_ptr<executor::TaskExecutor> taskExecutor,
const CancellationToken& token,
- const StringData& recipientTagName,
- const StringData& recipientSetName) {
-
- auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext());
- invariant(replCoord);
- auto recipientConnectionString = serverless::makeRecipientConnectionString(
- replCoord->getConfig(), recipientTagName, recipientSetName);
+ const ConnectionString& recipientConnectionString) {
// build a vector of single server discovery monitors to listen for heartbeats
auto eventsPublisher = std::make_shared<sdam::TopologyEventsPublisher>(taskExecutor);
@@ -257,6 +244,8 @@ ExecutorFuture<void> ShardSplitDonorService::_rebuildService(
return _createStateDocumentTTLIndex(executor, token);
}
+boost::optional<TaskExecutorPtr>
+ ShardSplitDonorService::DonorStateMachine::_splitAcceptanceTaskExecutorForTest;
ShardSplitDonorService::DonorStateMachine::DonorStateMachine(
ServiceContext* serviceContext,
ShardSplitDonorService* splitService,
@@ -383,15 +372,13 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
"id"_attr = _migrationId,
"timeout"_attr = repl::shardSplitTimeoutMS.load());
- _createReplicaSetMonitor(abortToken);
-
_decisionPromise.setWith([&] {
return ExecutorFuture(**executor)
.then([this, executor, primaryToken] {
// Note we do not use the abort split token here because the abortShardSplit
// command waits for a decision to be persisted which will not happen if
// inserting the initial state document fails.
- return _writeInitialDocument(executor, primaryToken);
+ return _enterBlockingOrAbortedState(executor, primaryToken);
})
.then([this, executor, abortToken] {
checkForTokenInterrupt(abortToken);
@@ -561,7 +548,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipien
}
return ExecutorFuture(**executor)
- .then([&]() { return _recipientAcceptedSplit.getFuture(); })
+ .then([&]() { return _splitAcceptancePromise.getFuture(); })
.then([this, executor, token] {
LOGV2(6142503,
"Recipient has accepted the split, committing decision.",
@@ -574,13 +561,12 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipien
});
}
-ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDocument(
+ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState(
const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryServiceToken) {
ShardSplitDonorStateEnum nextState;
{
stdx::lock_guard<Latch> lg(_mutex);
-
if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) {
if (isAbortedDocumentPersistent(lg, _stateDoc)) {
// Node has step up and created an instance using a document in abort state. No need
@@ -594,14 +580,42 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDoc
_abortReason->serializeErrorToBSON(&bob);
_stateDoc.setAbortReason(bob.obj());
nextState = ShardSplitDonorStateEnum::kAborted;
+ } else {
+ auto recipientTagName = _stateDoc.getRecipientTagName();
+ invariant(recipientTagName);
+ auto recipientSetName = _stateDoc.getRecipientSetName();
+ invariant(recipientSetName);
+ auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
+ auto recipientConnectionString = serverless::makeRecipientConnectionString(
+ config, *recipientTagName, *recipientSetName);
+
+ // Always start the replica set monitor if we haven't reached a decision yet
+ _splitAcceptancePromise.setWith([&]() -> Future<void> {
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking ||
+ MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) {
+ return SemiFuture<void>::makeReady().unsafeToInlineFuture();
+ }
+
+ // Optionally select a task executor for unit testing
+ auto executor = _splitAcceptanceTaskExecutorForTest
+ ? *_splitAcceptanceTaskExecutorForTest
+ : _shardSplitService->getInstanceCleanupExecutor();
+
+ return detail::makeRecipientAcceptSplitFuture(
+ executor, primaryServiceToken, recipientConnectionString)
+ .unsafeToInlineFuture();
+ });
+
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) {
+ // Node has step up and resumed a shard split. No need to write the document as it
+ // already exists.
+ return ExecutorFuture(**executor);
+ }
- } else if (_stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized) {
+ // Otherwise, record the recipient connection string
+ _stateDoc.setRecipientConnectionString(recipientConnectionString);
_stateDoc.setState(ShardSplitDonorStateEnum::kBlocking);
nextState = ShardSplitDonorStateEnum::kBlocking;
- } else {
- // Node has step up and resumed a shard split. No need to write the document as it
- // already exists.
- return ExecutorFuture(**executor);
}
}
@@ -756,36 +770,6 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout(
.semi();
}
-void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor(
- const CancellationToken& abortToken) {
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) {
- return;
- }
- }
-
- auto future = [&]() {
- stdx::lock_guard<Latch> lg(_mutex);
- if (MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { // Test-only.
- return SemiFuture<void>::makeReady();
- }
-
- auto recipientTagName = _stateDoc.getRecipientTagName();
- auto recipientSetName = _stateDoc.getRecipientSetName();
- invariant(recipientTagName);
- invariant(recipientSetName);
-
- return detail::makeRecipientAcceptSplitFuture(
- _shardSplitService->getInstanceCleanupExecutor(),
- abortToken,
- *recipientTagName,
- *recipientSetName);
- }();
-
- _recipientAcceptedSplit.setFrom(std::move(future).unsafeToInlineFuture());
-}
-
ExecutorFuture<ShardSplitDonorService::DonorStateMachine::DurableState>
ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
StatusWith<DurableState> statusWithState,
@@ -846,26 +830,6 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState(
});
}
-ExecutorFuture<repl::OpTime>
-ShardSplitDonorService::DonorStateMachine::_markStateDocAsGarbageCollectable(
- std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) {
- stdx::lock_guard<Latch> lg(_mutex);
-
- _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
- Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
-
- return AsyncTry([this, self = shared_from_this()] {
- auto opCtxHolder = cc().makeOperationContext();
- auto opCtx = opCtxHolder.get();
-
- uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc));
- return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- })
- .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
- .withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, token);
-}
-
ExecutorFuture<void>
ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectible(
const ScopedTaskExecutorPtr& executor, const CancellationToken& token) {
@@ -888,7 +852,22 @@ ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageColle
LOGV2(6236606,
"Marking shard split as garbage-collectable.",
"migrationId"_attr = _migrationId);
- return _markStateDocAsGarbageCollectable(executor, token);
+
+ stdx::lock_guard<Latch> lg(_mutex);
+ _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
+ Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
+
+ return AsyncTry([this, self = shared_from_this()] {
+ auto opCtxHolder = cc().makeOperationContext();
+ auto opCtx = opCtxHolder.get();
+
+ uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc));
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ })
+ .until(
+ [](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
+ .withBackoffBetweenIterations(kExponentialBackoff)
+ .on(**executor, token);
})
.then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) {
return _waitForMajorityWriteConcern(executor, std::move(opTime), token);