diff options
Diffstat (limited to 'src/mongo/db/serverless/shard_split_donor_service.cpp')
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 137 |
1 files changed, 58 insertions, 79 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index d240f9c5496..defb74e9a3c 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -116,15 +116,8 @@ void insertTenantAccessBlocker(WithLock lk, ShardSplitDonorDocument donorStateDoc) { auto optionalTenants = donorStateDoc.getTenantIds(); invariant(optionalTenants); - - auto recipientTagName = donorStateDoc.getRecipientTagName(); - auto recipientSetName = donorStateDoc.getRecipientSetName(); - invariant(recipientTagName); - invariant(recipientSetName); - - auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); - auto recipientConnectionString = - serverless::makeRecipientConnectionString(config, *recipientTagName, *recipientSetName); + auto recipientConnectionString = donorStateDoc.getRecipientConnectionString(); + invariant(recipientConnectionString); for (const auto& tenantId : optionalTenants.get()) { auto mtab = std::make_shared<TenantMigrationDonorAccessBlocker>( @@ -132,7 +125,7 @@ void insertTenantAccessBlocker(WithLock lk, donorStateDoc.getId(), tenantId.toString(), MigrationProtocolEnum::kMultitenantMigrations, - recipientConnectionString.toString()); + recipientConnectionString->toString()); TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()).add(tenantId, mtab); @@ -154,13 +147,7 @@ namespace detail { SemiFuture<void> makeRecipientAcceptSplitFuture( std::shared_ptr<executor::TaskExecutor> taskExecutor, const CancellationToken& token, - const StringData& recipientTagName, - const StringData& recipientSetName) { - - auto replCoord = repl::ReplicationCoordinator::get(cc().getServiceContext()); - invariant(replCoord); - auto recipientConnectionString = serverless::makeRecipientConnectionString( - replCoord->getConfig(), recipientTagName, recipientSetName); + const ConnectionString& recipientConnectionString) { // build a vector of single server discovery monitors to listen for heartbeats auto eventsPublisher = std::make_shared<sdam::TopologyEventsPublisher>(taskExecutor); @@ -257,6 +244,8 @@ ExecutorFuture<void> ShardSplitDonorService::_rebuildService( return _createStateDocumentTTLIndex(executor, token); } +boost::optional<TaskExecutorPtr> + ShardSplitDonorService::DonorStateMachine::_splitAcceptanceTaskExecutorForTest; ShardSplitDonorService::DonorStateMachine::DonorStateMachine( ServiceContext* serviceContext, ShardSplitDonorService* splitService, @@ -383,15 +372,13 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( "id"_attr = _migrationId, "timeout"_attr = repl::shardSplitTimeoutMS.load()); - _createReplicaSetMonitor(abortToken); - _decisionPromise.setWith([&] { return ExecutorFuture(**executor) .then([this, executor, primaryToken] { // Note we do not use the abort split token here because the abortShardSplit // command waits for a decision to be persisted which will not happen if // inserting the initial state document fails. - return _writeInitialDocument(executor, primaryToken); + return _enterBlockingOrAbortedState(executor, primaryToken); }) .then([this, executor, abortToken] { checkForTokenInterrupt(abortToken); @@ -561,7 +548,7 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipien } return ExecutorFuture(**executor) - .then([&]() { return _recipientAcceptedSplit.getFuture(); }) + .then([&]() { return _splitAcceptancePromise.getFuture(); }) .then([this, executor, token] { LOGV2(6142503, "Recipient has accepted the split, committing decision.", @@ -574,13 +561,12 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipien }); } -ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDocument( +ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState( const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryServiceToken) { ShardSplitDonorStateEnum nextState; { stdx::lock_guard<Latch> lg(_mutex); - if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) { if (isAbortedDocumentPersistent(lg, _stateDoc)) { // Node has step up and created an instance using a document in abort state. No need @@ -594,14 +580,42 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_writeInitialDoc _abortReason->serializeErrorToBSON(&bob); _stateDoc.setAbortReason(bob.obj()); nextState = ShardSplitDonorStateEnum::kAborted; + } else { + auto recipientTagName = _stateDoc.getRecipientTagName(); + invariant(recipientTagName); + auto recipientSetName = _stateDoc.getRecipientSetName(); + invariant(recipientSetName); + auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); + auto recipientConnectionString = serverless::makeRecipientConnectionString( + config, *recipientTagName, *recipientSetName); + + // Always start the replica set monitor if we haven't reached a decision yet + _splitAcceptancePromise.setWith([&]() -> Future<void> { + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking || + MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { + return SemiFuture<void>::makeReady().unsafeToInlineFuture(); + } + + // Optionally select a task executor for unit testing + auto executor = _splitAcceptanceTaskExecutorForTest + ? *_splitAcceptanceTaskExecutorForTest + : _shardSplitService->getInstanceCleanupExecutor(); + + return detail::makeRecipientAcceptSplitFuture( + executor, primaryServiceToken, recipientConnectionString) + .unsafeToInlineFuture(); + }); + + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) { + // Node has step up and resumed a shard split. No need to write the document as it + // already exists. + return ExecutorFuture(**executor); + } - } else if (_stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized) { + // Otherwise, record the recipient connection string + _stateDoc.setRecipientConnectionString(recipientConnectionString); _stateDoc.setState(ShardSplitDonorStateEnum::kBlocking); nextState = ShardSplitDonorStateEnum::kBlocking; - } else { - // Node has step up and resumed a shard split. No need to write the document as it - // already exists. - return ExecutorFuture(**executor); } } @@ -756,36 +770,6 @@ void ShardSplitDonorService::DonorStateMachine::_initiateTimeout( .semi(); } -void ShardSplitDonorService::DonorStateMachine::_createReplicaSetMonitor( - const CancellationToken& abortToken) { - { - stdx::lock_guard<Latch> lg(_mutex); - if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) { - return; - } - } - - auto future = [&]() { - stdx::lock_guard<Latch> lg(_mutex); - if (MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { // Test-only. - return SemiFuture<void>::makeReady(); - } - - auto recipientTagName = _stateDoc.getRecipientTagName(); - auto recipientSetName = _stateDoc.getRecipientSetName(); - invariant(recipientTagName); - invariant(recipientSetName); - - return detail::makeRecipientAcceptSplitFuture( - _shardSplitService->getInstanceCleanupExecutor(), - abortToken, - *recipientTagName, - *recipientSetName); - }(); - - _recipientAcceptedSplit.setFrom(std::move(future).unsafeToInlineFuture()); -} - ExecutorFuture<ShardSplitDonorService::DonorStateMachine::DurableState> ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( StatusWith<DurableState> statusWithState, @@ -846,26 +830,6 @@ ShardSplitDonorService::DonorStateMachine::_handleErrorOrEnterAbortedState( }); } -ExecutorFuture<repl::OpTime> -ShardSplitDonorService::DonorStateMachine::_markStateDocAsGarbageCollectable( - std::shared_ptr<executor::ScopedTaskExecutor> executor, const CancellationToken& token) { - stdx::lock_guard<Latch> lg(_mutex); - - _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + - Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); - - return AsyncTry([this, self = shared_from_this()] { - auto opCtxHolder = cc().makeOperationContext(); - auto opCtx = opCtxHolder.get(); - - uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc)); - return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - }) - .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) - .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, token); -} - ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageCollectible( const ScopedTaskExecutorPtr& executor, const CancellationToken& token) { @@ -888,7 +852,22 @@ ShardSplitDonorService::DonorStateMachine::_waitForForgetCmdThenMarkGarbageColle LOGV2(6236606, "Marking shard split as garbage-collectable.", "migrationId"_attr = _migrationId); - return _markStateDocAsGarbageCollectable(executor, token); + + stdx::lock_guard<Latch> lg(_mutex); + _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); + + return AsyncTry([this, self = shared_from_this()] { + auto opCtxHolder = cc().makeOperationContext(); + auto opCtx = opCtxHolder.get(); + + uassertStatusOK(serverless::updateStateDoc(opCtx, _stateDoc)); + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + }) + .until( + [](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) + .withBackoffBetweenIterations(kExponentialBackoff) + .on(**executor, token); }) .then([this, self = shared_from_this(), executor, token](repl::OpTime opTime) { return _waitForMajorityWriteConcern(executor, std::move(opTime), token); |