diff options
Diffstat (limited to 'src/mongo/db/serverless/shard_split_donor_service.cpp')
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 510 |
1 files changed, 260 insertions, 250 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index f37a9416f5e..deb78f1779b 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -68,54 +68,16 @@ MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterMarkingStateGarbageCollectable); MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeSplitConfigRemoval); MONGO_FAIL_POINT_DEFINE(skipShardSplitRecipientCleanup); MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeLeavingBlockingState); +MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterUpdatingToCommittedState); +MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeSendingStepUpToRecipients); +MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterReceivingAbortCmd); const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max()); -bool shouldStopInsertingDonorStateDoc(Status status) { - return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress; -} - -void setStateDocTimestamps(WithLock, - ShardSplitDonorStateEnum nextState, - repl::OpTime time, - ShardSplitDonorDocument& stateDoc) { - switch (nextState) { - case ShardSplitDonorStateEnum::kUninitialized: - break; - case ShardSplitDonorStateEnum::kBlocking: - stateDoc.setBlockTimestamp(time.getTimestamp()); - break; - case ShardSplitDonorStateEnum::kAborted: - stateDoc.setCommitOrAbortOpTime(time); - break; - case ShardSplitDonorStateEnum::kCommitted: - stateDoc.setCommitOrAbortOpTime(time); - break; - default: - MONGO_UNREACHABLE; - } -} - bool isAbortedDocumentPersistent(WithLock, ShardSplitDonorDocument& stateDoc) { return !!stateDoc.getAbortReason(); } -void setMtabToBlockingForTenants(ServiceContext* context, - OperationContext* opCtx, - const std::vector<StringData>& tenantIds) { - // Start blocking writes before getting an oplog slot to guarantee no - // writes to the tenant's data can commit with a timestamp after the - // block timestamp. - for (const auto& tenantId : tenantIds) { - auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(context, - tenantId); - invariant(mtab); - mtab->startBlockingWrites(); - - opCtx->recoveryUnit()->onRollback([mtab] { mtab->rollBackStartBlocking(); }); - } -} - void checkForTokenInterrupt(const CancellationToken& token) { uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled()); } @@ -304,11 +266,14 @@ ShardSplitDonorService::DonorStateMachine::DonorStateMachine( void ShardSplitDonorService::DonorStateMachine::tryAbort() { LOGV2(6086502, "Received 'abortShardSplit' command.", "id"_attr = _migrationId); - stdx::lock_guard<Latch> lg(_mutex); - _abortRequested = true; - if (_abortSource) { - _abortSource->cancel(); + { + stdx::lock_guard<Latch> lg(_mutex); + _abortRequested = true; + if (_abortSource) { + _abortSource->cancel(); + } } + pauseShardSplitAfterReceivingAbortCmd.pauseWhileSet(); } void ShardSplitDonorService::DonorStateMachine::tryForget() { @@ -417,19 +382,16 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( // Note we do not use the abort split token here because the abortShardSplit // command waits for a decision to be persisted which will not happen if // inserting the initial state document fails. - if (MONGO_unlikely(pauseShardSplitBeforeBlockingState.shouldFail())) { - pauseShardSplitBeforeBlockingState.pauseWhileSet(); - } - return _enterBlockingOrAbortedState(executor, primaryToken, abortToken); + return _enterAbortIndexBuildsOrAbortedState(executor, primaryToken, abortToken); + }) + .then([this, executor, abortToken] { + // Start tracking the abortToken for killing operation contexts + _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); + return _abortIndexBuildsAndEnterBlockingState(executor, abortToken); }) .then([this, executor, abortToken, criticalSectionTimer] { criticalSectionTimer->reset(); - checkForTokenInterrupt(abortToken); - _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor); - _abortIndexBuilds(abortToken); - }) - .then([this, executor, abortToken] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); pauseShardSplitAfterBlocking.pauseWhileSet(opCtx.get()); @@ -440,7 +402,12 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run( return _applySplitConfigToDonor(executor, abortToken); }) .then([this, executor, abortToken] { - return _waitForRecipientToAcceptSplitAndTriggerElection(executor, abortToken); + return _waitForRecipientToAcceptSplit(executor, abortToken); + }) + .then([this, executor, primaryToken] { + // only cancel operations on stepdown from here out + _cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor); + return _triggerElectionAndEnterCommitedState(executor, primaryToken); }) // anchor ensures the instance will still exists even if the primary stepped down .onCompletion([this, @@ -572,6 +539,143 @@ bool ShardSplitDonorService::DonorStateMachine::_hasInstalledSplitConfig(WithLoc config.getRecipientConfig()->getReplSetName() == *_stateDoc.getRecipientSetName(); } +ConnectionString ShardSplitDonorService::DonorStateMachine::_setupAcceptanceMonitoring( + WithLock lock, const CancellationToken& abortToken) { + auto recipientConnectionString = [stateDoc = _stateDoc]() { + if (stateDoc.getRecipientConnectionString()) { + return *stateDoc.getRecipientConnectionString(); + } + + auto recipientTagName = stateDoc.getRecipientTagName(); + invariant(recipientTagName); + auto recipientSetName = stateDoc.getRecipientSetName(); + invariant(recipientSetName); + auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); + return serverless::makeRecipientConnectionString( + config, *recipientTagName, *recipientSetName); + }(); + + // Always start the replica set monitor if we haven't reached a decision yet + _splitAcceptancePromise.setWith([&]() -> Future<void> { + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking || + MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { + return SemiFuture<void>::makeReady().unsafeToInlineFuture(); + } + + // Optionally select a task executor for unit testing + auto executor = _splitAcceptanceTaskExecutorForTest + ? *_splitAcceptanceTaskExecutorForTest + : _shardSplitService->getInstanceCleanupExecutor(); + + LOGV2(6142508, + "Monitoring recipient nodes for split acceptance.", + "id"_attr = _migrationId, + "recipientConnectionString"_attr = recipientConnectionString); + + return detail::makeRecipientAcceptSplitFuture( + executor, abortToken, recipientConnectionString, _migrationId) + .unsafeToInlineFuture(); + }); + + return recipientConnectionString; +} + +ExecutorFuture<void> +ShardSplitDonorService::DonorStateMachine::_enterAbortIndexBuildsOrAbortedState( + const ScopedTaskExecutorPtr& executor, + const CancellationToken& primaryToken, + const CancellationToken& abortToken) { + ShardSplitDonorStateEnum nextState; + { + stdx::lock_guard<Latch> lg(_mutex); + if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) { + if (isAbortedDocumentPersistent(lg, _stateDoc)) { + // Node has step up and created an instance using a document in abort state. No + // need to write the document as it already exists. + return ExecutorFuture(**executor); + } + + _abortReason = + Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'."); + BSONObjBuilder bob; + _abortReason->serializeErrorToBSON(&bob); + _stateDoc.setAbortReason(bob.obj()); + _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + + Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); + nextState = ShardSplitDonorStateEnum::kAborted; + + LOGV2(6670500, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId()); + } else { + // Always set up acceptance monitoring. + auto recipientConnectionString = _setupAcceptanceMonitoring(lg, abortToken); + + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) { + // Node has stepped up and resumed a shard split. No need to write the document as + // it already exists. + return ExecutorFuture(**executor); + } + + _stateDoc.setRecipientConnectionString(recipientConnectionString); + nextState = ShardSplitDonorStateEnum::kAbortingIndexBuilds; + + LOGV2( + 6670501, "Entering 'aborting index builds' state.", "id"_attr = _stateDoc.getId()); + } + } + + return _updateStateDocument(executor, primaryToken, nextState) + .then([this, executor, primaryToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken); + }) + .then([this, executor, nextState]() { + uassert(ErrorCodes::TenantMigrationAborted, + "Shard split operation aborted.", + nextState != ShardSplitDonorStateEnum::kAborted); + }); +} + +ExecutorFuture<void> +ShardSplitDonorService::DonorStateMachine::_abortIndexBuildsAndEnterBlockingState( + const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { + checkForTokenInterrupt(abortToken); + + boost::optional<std::vector<StringData>> tenantIds; + { + stdx::lock_guard<Latch> lg(_mutex); + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kAbortingIndexBuilds) { + return ExecutorFuture(**executor); + } + + tenantIds = _stateDoc.getTenantIds(); + invariant(tenantIds); + } + + LOGV2(6436100, "Aborting index builds for shard split.", "id"_attr = _migrationId); + + // Abort any in-progress index builds. No new index builds can start while we are doing this + // because the mtab prevents it. + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + auto* indexBuildsCoordinator = IndexBuildsCoordinator::get(opCtx.get()); + for (const auto& tenantId : *tenantIds) { + indexBuildsCoordinator->abortTenantIndexBuilds( + opCtx.get(), MigrationProtocolEnum::kMultitenantMigrations, tenantId, "shard split"); + } + + if (MONGO_unlikely(pauseShardSplitBeforeBlockingState.shouldFail())) { + pauseShardSplitBeforeBlockingState.pauseWhileSet(); + } + + { + stdx::lock_guard<Latch> lg(_mutex); + LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId()); + } + + return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kBlocking) + .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken); + }); +} + ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestamp( const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { @@ -661,7 +765,7 @@ ExecutorFuture<void> sendStepUpToRecipient(const HostAndPort recipient, return AsyncTry([executor, recipient, token] { executor::RemoteCommandRequest request( recipient, "admin", BSON("replSetStepUp" << 1 << "skipDryRun" << true), nullptr); - + pauseShardSplitBeforeSendingStepUpToRecipients.pauseWhileSet(); return executor->scheduleRemoteCommand(request, token) .then([](const auto& response) { return getStatusFromCommandResult(response.data); @@ -676,10 +780,26 @@ ExecutorFuture<void> sendStepUpToRecipient(const HostAndPort recipient, .on(executor, token); } -ExecutorFuture<void> -ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTriggerElection( +ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplit( const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) { + checkForTokenInterrupt(abortToken); + { + stdx::lock_guard<Latch> lg(_mutex); + if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) { + return ExecutorFuture(**executor); + } + } + + LOGV2(6142501, "Waiting for recipient to accept the split.", "id"_attr = _migrationId); + + return ExecutorFuture(**executor).then([&]() { return _splitAcceptancePromise.getFuture(); }); +} + +ExecutorFuture<void> +ShardSplitDonorService::DonorStateMachine::_triggerElectionAndEnterCommitedState( + const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) { + checkForTokenInterrupt(primaryToken); std::vector<HostAndPort> recipients; { @@ -699,10 +819,7 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTrig auto remoteCommandExecutor = _splitAcceptanceTaskExecutorForTest ? *_splitAcceptanceTaskExecutorForTest : **executor; - LOGV2(6142501, "Waiting for recipient to accept the split.", "id"_attr = _migrationId); - return ExecutorFuture(**executor) - .then([&]() { return _splitAcceptancePromise.getFuture(); }) .then([this] { auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); if (MONGO_unlikely(pauseShardSplitBeforeLeavingBlockingState.shouldFail())) { @@ -723,7 +840,7 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTrig uasserted(ErrorCodes::InternalError, "simulate a shard split error"); } }) - .then([this, recipients, abortToken, remoteCommandExecutor] { + .then([this, recipients, primaryToken, remoteCommandExecutor] { LOGV2(6493901, "Triggering an election after recipient has accepted the split.", "id"_attr = _migrationId); @@ -733,14 +850,16 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTrig // succeed). Selecting a random node has a 2/3 chance to succeed for replSetStepUp. If // the first command fail, we know this node is the most out-of-date. Therefore we // select the next node and we know the first node selected will vote for the second. - return sendStepUpToRecipient(recipients[0], remoteCommandExecutor, abortToken) - .onCompletion([this, recipients, remoteCommandExecutor, abortToken](Status status) { - if (status.isOK()) { - return ExecutorFuture<void>(remoteCommandExecutor, status); - } + return sendStepUpToRecipient(recipients[0], remoteCommandExecutor, primaryToken) + .onCompletion( + [this, recipients, remoteCommandExecutor, primaryToken](Status status) { + if (status.isOK()) { + return ExecutorFuture<void>(remoteCommandExecutor, status); + } - return sendStepUpToRecipient(recipients[1], remoteCommandExecutor, abortToken); - }) + return sendStepUpToRecipient( + recipients[1], remoteCommandExecutor, primaryToken); + }) .onCompletion([this](Status replSetStepUpStatus) { if (!replSetStepUpStatus.isOK()) { LOGV2(6493904, @@ -756,180 +875,93 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTrig }); }) .thenRunOn(**executor) - .then([this, executor, abortToken]() { + .then([this, executor, primaryToken]() { LOGV2(6142503, "Entering 'committed' state.", "id"_attr = _stateDoc.getId()); + auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); + pauseShardSplitAfterUpdatingToCommittedState.pauseWhileSet(opCtx.get()); - return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kCommitted) - .then([this, executor, abortToken](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken); + return _updateStateDocument( + executor, primaryToken, ShardSplitDonorStateEnum::kCommitted) + .then([this, executor, primaryToken](repl::OpTime opTime) { + return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken); }); }); } -ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState( - const ScopedTaskExecutorPtr& executor, - const CancellationToken& primaryToken, - const CancellationToken& abortToken) { - ShardSplitDonorStateEnum nextState; - { - stdx::lock_guard<Latch> lg(_mutex); - if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) { - if (isAbortedDocumentPersistent(lg, _stateDoc)) { - // Node has step up and created an instance using a document in abort state. No - // need to write the document as it already exists. - return ExecutorFuture(**executor); - } - - _abortReason = - Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'."); - BSONObjBuilder bob; - _abortReason->serializeErrorToBSON(&bob); - _stateDoc.setAbortReason(bob.obj()); - _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() + - Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()}); - nextState = ShardSplitDonorStateEnum::kAborted; - - LOGV2(8423355, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId()); - } else { - auto recipientConnectionString = [stateDoc = _stateDoc]() { - if (stateDoc.getRecipientConnectionString()) { - return *stateDoc.getRecipientConnectionString(); - } - - auto recipientTagName = stateDoc.getRecipientTagName(); - invariant(recipientTagName); - auto recipientSetName = stateDoc.getRecipientSetName(); - invariant(recipientSetName); - auto config = - repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig(); - return serverless::makeRecipientConnectionString( - config, *recipientTagName, *recipientSetName); - }(); - - // Always start the replica set monitor if we haven't reached a decision yet - _splitAcceptancePromise.setWith([&]() -> Future<void> { - if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking || - MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) { - return SemiFuture<void>::makeReady().unsafeToInlineFuture(); - } - - // Optionally select a task executor for unit testing - auto executor = _splitAcceptanceTaskExecutorForTest - ? *_splitAcceptanceTaskExecutorForTest - : _shardSplitService->getInstanceCleanupExecutor(); - - LOGV2(6142508, - "Monitoring recipient nodes for split acceptance.", - "id"_attr = _migrationId, - "recipientConnectionString"_attr = recipientConnectionString); - - return detail::makeRecipientAcceptSplitFuture( - executor, abortToken, recipientConnectionString, _migrationId) - .unsafeToInlineFuture(); - }); - - if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) { - // Node has step up and resumed a shard split. No need to write the document as - // it already exists. - return ExecutorFuture(**executor); - } - - // Otherwise, record the recipient connection string - _stateDoc.setRecipientConnectionString(recipientConnectionString); - _stateDoc.setState(ShardSplitDonorStateEnum::kBlocking); - nextState = ShardSplitDonorStateEnum::kBlocking; - - LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId()); - } - } - - return AsyncTry([this, nextState, uuid = _migrationId]() { - auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto opCtx = opCtxHolder.get(); - - AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); - - writeConflictRetry( - opCtx, "ShardSplitDonorInsertStateDoc", _stateDocumentsNS.ns(), [&] { - const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid); - const auto getUpdatedStateDocBson = [&]() { - stdx::lock_guard<Latch> lg(_mutex); - return _stateDoc.toBSON(); - }; - - WriteUnitOfWork wuow(opCtx); - if (nextState == ShardSplitDonorStateEnum::kBlocking) { - stdx::lock_guard<Latch> lg(_mutex); - - insertTenantAccessBlocker(lg, opCtx, _stateDoc); - - auto tenantIds = _stateDoc.getTenantIds(); - invariant(tenantIds); - setMtabToBlockingForTenants(_serviceContext, opCtx, tenantIds.get()); - } - - // Reserve an opTime for the write. - auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; - setStateDocTimestamps( - stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc); - - auto updateResult = Helpers::upsert(opCtx, - _stateDocumentsNS.ns(), - filter, - getUpdatedStateDocBson(), - /*fromMigrate=*/false); - - - // We only want to insert, not modify, document - invariant(updateResult.numMatched == 0); - wuow.commit(); - }); - - return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); - }) - .until([](StatusWith<repl::OpTime> swOpTime) { - return shouldStopInsertingDonorStateDoc(swOpTime.getStatus()); - }) - .withBackoffBetweenIterations(kExponentialBackoff) - .on(**executor, primaryToken) - .then([this, executor, primaryToken](repl::OpTime opTime) { - return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken); - }) - .then([this, executor, nextState]() { - uassert(ErrorCodes::TenantMigrationAborted, - "Shard split operation aborted.", - nextState != ShardSplitDonorStateEnum::kAborted); - }); -} - ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateStateDocument( const ScopedTaskExecutorPtr& executor, const CancellationToken& token, ShardSplitDonorStateEnum nextState) { - auto tenantIds = [&]() { + auto [tenantIds, isInsert] = [&]() { stdx::lock_guard<Latch> lg(_mutex); - _stateDoc.setState(nextState); - - return _stateDoc.getTenantIds(); + auto isInsert = _stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized || + _stateDoc.getState() == ShardSplitDonorStateEnum::kAborted; + return std::make_pair(_stateDoc.getTenantIds(), isInsert); }(); - return AsyncTry([this, tenantIds = std::move(tenantIds), uuid = _migrationId, nextState] { + return AsyncTry([this, + tenantIds = std::move(tenantIds), + isInsert = isInsert, + uuid = _migrationId, + nextState] { auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc()); auto opCtx = opCtxHolder.get(); AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX); - uassert(ErrorCodes::NamespaceNotFound, - str::stream() << _stateDocumentsNS.ns() << " does not exist", - collection); + + if (!isInsert) { + uassert(ErrorCodes::NamespaceNotFound, + str::stream() << _stateDocumentsNS.ns() << " does not exist", + collection); + } writeConflictRetry( - opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&] { + opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&]() { WriteUnitOfWork wuow(opCtx); + if (nextState == ShardSplitDonorStateEnum::kBlocking) { + // Start blocking writes before getting an oplog slot to guarantee no + // writes to the tenant's data can commit with a timestamp after the + // block timestamp. + for (const auto& tenantId : *tenantIds) { + auto mtab = tenant_migration_access_blocker:: + getTenantMigrationDonorAccessBlocker(_serviceContext, tenantId); + invariant(mtab); + mtab->startBlockingWrites(); + + opCtx->recoveryUnit()->onRollback( + [mtab] { mtab->rollBackStartBlocking(); }); + } + } + // Reserve an opTime for the write. auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; - setStateDocTimestamps( - stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc); + { + stdx::lock_guard<Latch> lg(_mutex); + _stateDoc.setState(nextState); + switch (nextState) { + case ShardSplitDonorStateEnum::kUninitialized: + case ShardSplitDonorStateEnum::kAbortingIndexBuilds: + break; + case ShardSplitDonorStateEnum::kBlocking: + _stateDoc.setBlockTimestamp(oplogSlot.getTimestamp()); + break; + case ShardSplitDonorStateEnum::kCommitted: + _stateDoc.setCommitOrAbortOpTime(oplogSlot); + break; + case ShardSplitDonorStateEnum::kAborted: { + _stateDoc.setCommitOrAbortOpTime(oplogSlot); + + invariant(_abortReason); + BSONObjBuilder bob; + _abortReason.get().serializeErrorToBSON(&bob); + _stateDoc.setAbortReason(bob.obj()); + break; + } + default: + MONGO_UNREACHABLE; + } + } const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid); const auto updatedStateDocBson = [&]() { @@ -942,15 +974,19 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS updatedStateDocBson, /*fromMigrate=*/false); - invariant(updateResult.numDocsModified == 1); + if (isInsert) { + invariant(!updateResult.existing); + invariant(!updateResult.upsertedId.isEmpty()); + } else { + invariant(updateResult.numDocsModified == 1); + } + wuow.commit(); }); return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) - .until([](StatusWith<repl::OpTime> swOpTime) { - return shouldStopInsertingDonorStateDoc(swOpTime.getStatus()); - }) + .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); }) .withBackoffBetweenIterations(kExponentialBackoff) .on(**executor, token); } @@ -1148,30 +1184,4 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_cleanRecipientS .on(**executor, primaryToken) .ignoreValue(); } - -void ShardSplitDonorService::DonorStateMachine::_abortIndexBuilds( - const CancellationToken& abortToken) { - checkForTokenInterrupt(abortToken); - - boost::optional<std::vector<StringData>> tenantIds; - { - stdx::lock_guard<Latch> lg(_mutex); - if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) { - return; - } - tenantIds = _stateDoc.getTenantIds(); - invariant(tenantIds); - } - - LOGV2(6436100, "Aborting index build for shard split.", "id"_attr = _migrationId); - - // Before applying the split config, abort any in-progress index builds. No new index builds - // can start while we are doing this because the mtab prevents it. - auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc()); - auto* indexBuildsCoordinator = IndexBuildsCoordinator::get(opCtx.get()); - for (const auto& tenantId : *tenantIds) { - indexBuildsCoordinator->abortTenantIndexBuilds( - opCtx.get(), MigrationProtocolEnum::kMultitenantMigrations, tenantId, "shard split"); - } -} } // namespace mongo |