summaryrefslogtreecommitdiff
path: root/src/mongo/db/serverless/shard_split_donor_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/serverless/shard_split_donor_service.cpp')
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp510
1 files changed, 260 insertions, 250 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index f37a9416f5e..deb78f1779b 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -68,54 +68,16 @@ MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterMarkingStateGarbageCollectable);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeSplitConfigRemoval);
MONGO_FAIL_POINT_DEFINE(skipShardSplitRecipientCleanup);
MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeLeavingBlockingState);
+MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterUpdatingToCommittedState);
+MONGO_FAIL_POINT_DEFINE(pauseShardSplitBeforeSendingStepUpToRecipients);
+MONGO_FAIL_POINT_DEFINE(pauseShardSplitAfterReceivingAbortCmd);
const Backoff kExponentialBackoff(Seconds(1), Milliseconds::max());
-bool shouldStopInsertingDonorStateDoc(Status status) {
- return status.isOK() || status == ErrorCodes::ConflictingOperationInProgress;
-}
-
-void setStateDocTimestamps(WithLock,
- ShardSplitDonorStateEnum nextState,
- repl::OpTime time,
- ShardSplitDonorDocument& stateDoc) {
- switch (nextState) {
- case ShardSplitDonorStateEnum::kUninitialized:
- break;
- case ShardSplitDonorStateEnum::kBlocking:
- stateDoc.setBlockTimestamp(time.getTimestamp());
- break;
- case ShardSplitDonorStateEnum::kAborted:
- stateDoc.setCommitOrAbortOpTime(time);
- break;
- case ShardSplitDonorStateEnum::kCommitted:
- stateDoc.setCommitOrAbortOpTime(time);
- break;
- default:
- MONGO_UNREACHABLE;
- }
-}
-
bool isAbortedDocumentPersistent(WithLock, ShardSplitDonorDocument& stateDoc) {
return !!stateDoc.getAbortReason();
}
-void setMtabToBlockingForTenants(ServiceContext* context,
- OperationContext* opCtx,
- const std::vector<StringData>& tenantIds) {
- // Start blocking writes before getting an oplog slot to guarantee no
- // writes to the tenant's data can commit with a timestamp after the
- // block timestamp.
- for (const auto& tenantId : tenantIds) {
- auto mtab = tenant_migration_access_blocker::getTenantMigrationDonorAccessBlocker(context,
- tenantId);
- invariant(mtab);
- mtab->startBlockingWrites();
-
- opCtx->recoveryUnit()->onRollback([mtab] { mtab->rollBackStartBlocking(); });
- }
-}
-
void checkForTokenInterrupt(const CancellationToken& token) {
uassert(ErrorCodes::CallbackCanceled, "Donor service interrupted", !token.isCanceled());
}
@@ -304,11 +266,14 @@ ShardSplitDonorService::DonorStateMachine::DonorStateMachine(
void ShardSplitDonorService::DonorStateMachine::tryAbort() {
LOGV2(6086502, "Received 'abortShardSplit' command.", "id"_attr = _migrationId);
- stdx::lock_guard<Latch> lg(_mutex);
- _abortRequested = true;
- if (_abortSource) {
- _abortSource->cancel();
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _abortRequested = true;
+ if (_abortSource) {
+ _abortSource->cancel();
+ }
}
+ pauseShardSplitAfterReceivingAbortCmd.pauseWhileSet();
}
void ShardSplitDonorService::DonorStateMachine::tryForget() {
@@ -417,19 +382,16 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
// Note we do not use the abort split token here because the abortShardSplit
// command waits for a decision to be persisted which will not happen if
// inserting the initial state document fails.
- if (MONGO_unlikely(pauseShardSplitBeforeBlockingState.shouldFail())) {
- pauseShardSplitBeforeBlockingState.pauseWhileSet();
- }
- return _enterBlockingOrAbortedState(executor, primaryToken, abortToken);
+ return _enterAbortIndexBuildsOrAbortedState(executor, primaryToken, abortToken);
+ })
+ .then([this, executor, abortToken] {
+ // Start tracking the abortToken for killing operation contexts
+ _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
+ return _abortIndexBuildsAndEnterBlockingState(executor, abortToken);
})
.then([this, executor, abortToken, criticalSectionTimer] {
criticalSectionTimer->reset();
- checkForTokenInterrupt(abortToken);
- _cancelableOpCtxFactory.emplace(abortToken, _markKilledExecutor);
- _abortIndexBuilds(abortToken);
- })
- .then([this, executor, abortToken] {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
pauseShardSplitAfterBlocking.pauseWhileSet(opCtx.get());
@@ -440,7 +402,12 @@ SemiFuture<void> ShardSplitDonorService::DonorStateMachine::run(
return _applySplitConfigToDonor(executor, abortToken);
})
.then([this, executor, abortToken] {
- return _waitForRecipientToAcceptSplitAndTriggerElection(executor, abortToken);
+ return _waitForRecipientToAcceptSplit(executor, abortToken);
+ })
+ .then([this, executor, primaryToken] {
+ // only cancel operations on stepdown from here out
+ _cancelableOpCtxFactory.emplace(primaryToken, _markKilledExecutor);
+ return _triggerElectionAndEnterCommitedState(executor, primaryToken);
})
// anchor ensures the instance will still exists even if the primary stepped down
.onCompletion([this,
@@ -572,6 +539,143 @@ bool ShardSplitDonorService::DonorStateMachine::_hasInstalledSplitConfig(WithLoc
config.getRecipientConfig()->getReplSetName() == *_stateDoc.getRecipientSetName();
}
+ConnectionString ShardSplitDonorService::DonorStateMachine::_setupAcceptanceMonitoring(
+ WithLock lock, const CancellationToken& abortToken) {
+ auto recipientConnectionString = [stateDoc = _stateDoc]() {
+ if (stateDoc.getRecipientConnectionString()) {
+ return *stateDoc.getRecipientConnectionString();
+ }
+
+ auto recipientTagName = stateDoc.getRecipientTagName();
+ invariant(recipientTagName);
+ auto recipientSetName = stateDoc.getRecipientSetName();
+ invariant(recipientSetName);
+ auto config = repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
+ return serverless::makeRecipientConnectionString(
+ config, *recipientTagName, *recipientSetName);
+ }();
+
+ // Always start the replica set monitor if we haven't reached a decision yet
+ _splitAcceptancePromise.setWith([&]() -> Future<void> {
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking ||
+ MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) {
+ return SemiFuture<void>::makeReady().unsafeToInlineFuture();
+ }
+
+ // Optionally select a task executor for unit testing
+ auto executor = _splitAcceptanceTaskExecutorForTest
+ ? *_splitAcceptanceTaskExecutorForTest
+ : _shardSplitService->getInstanceCleanupExecutor();
+
+ LOGV2(6142508,
+ "Monitoring recipient nodes for split acceptance.",
+ "id"_attr = _migrationId,
+ "recipientConnectionString"_attr = recipientConnectionString);
+
+ return detail::makeRecipientAcceptSplitFuture(
+ executor, abortToken, recipientConnectionString, _migrationId)
+ .unsafeToInlineFuture();
+ });
+
+ return recipientConnectionString;
+}
+
+ExecutorFuture<void>
+ShardSplitDonorService::DonorStateMachine::_enterAbortIndexBuildsOrAbortedState(
+ const ScopedTaskExecutorPtr& executor,
+ const CancellationToken& primaryToken,
+ const CancellationToken& abortToken) {
+ ShardSplitDonorStateEnum nextState;
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) {
+ if (isAbortedDocumentPersistent(lg, _stateDoc)) {
+ // Node has step up and created an instance using a document in abort state. No
+ // need to write the document as it already exists.
+ return ExecutorFuture(**executor);
+ }
+
+ _abortReason =
+ Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'.");
+ BSONObjBuilder bob;
+ _abortReason->serializeErrorToBSON(&bob);
+ _stateDoc.setAbortReason(bob.obj());
+ _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
+ Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
+ nextState = ShardSplitDonorStateEnum::kAborted;
+
+ LOGV2(6670500, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId());
+ } else {
+ // Always set up acceptance monitoring.
+ auto recipientConnectionString = _setupAcceptanceMonitoring(lg, abortToken);
+
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) {
+ // Node has stepped up and resumed a shard split. No need to write the document as
+ // it already exists.
+ return ExecutorFuture(**executor);
+ }
+
+ _stateDoc.setRecipientConnectionString(recipientConnectionString);
+ nextState = ShardSplitDonorStateEnum::kAbortingIndexBuilds;
+
+ LOGV2(
+ 6670501, "Entering 'aborting index builds' state.", "id"_attr = _stateDoc.getId());
+ }
+ }
+
+ return _updateStateDocument(executor, primaryToken, nextState)
+ .then([this, executor, primaryToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken);
+ })
+ .then([this, executor, nextState]() {
+ uassert(ErrorCodes::TenantMigrationAborted,
+ "Shard split operation aborted.",
+ nextState != ShardSplitDonorStateEnum::kAborted);
+ });
+}
+
+ExecutorFuture<void>
+ShardSplitDonorService::DonorStateMachine::_abortIndexBuildsAndEnterBlockingState(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
+ checkForTokenInterrupt(abortToken);
+
+ boost::optional<std::vector<StringData>> tenantIds;
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kAbortingIndexBuilds) {
+ return ExecutorFuture(**executor);
+ }
+
+ tenantIds = _stateDoc.getTenantIds();
+ invariant(tenantIds);
+ }
+
+ LOGV2(6436100, "Aborting index builds for shard split.", "id"_attr = _migrationId);
+
+ // Abort any in-progress index builds. No new index builds can start while we are doing this
+ // because the mtab prevents it.
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ auto* indexBuildsCoordinator = IndexBuildsCoordinator::get(opCtx.get());
+ for (const auto& tenantId : *tenantIds) {
+ indexBuildsCoordinator->abortTenantIndexBuilds(
+ opCtx.get(), MigrationProtocolEnum::kMultitenantMigrations, tenantId, "shard split");
+ }
+
+ if (MONGO_unlikely(pauseShardSplitBeforeBlockingState.shouldFail())) {
+ pauseShardSplitBeforeBlockingState.pauseWhileSet();
+ }
+
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId());
+ }
+
+ return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kBlocking)
+ .then([this, self = shared_from_this(), executor, abortToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken);
+ });
+}
+
ExecutorFuture<void>
ShardSplitDonorService::DonorStateMachine::_waitForRecipientToReachBlockTimestamp(
const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
@@ -661,7 +765,7 @@ ExecutorFuture<void> sendStepUpToRecipient(const HostAndPort recipient,
return AsyncTry([executor, recipient, token] {
executor::RemoteCommandRequest request(
recipient, "admin", BSON("replSetStepUp" << 1 << "skipDryRun" << true), nullptr);
-
+ pauseShardSplitBeforeSendingStepUpToRecipients.pauseWhileSet();
return executor->scheduleRemoteCommand(request, token)
.then([](const auto& response) {
return getStatusFromCommandResult(response.data);
@@ -676,10 +780,26 @@ ExecutorFuture<void> sendStepUpToRecipient(const HostAndPort recipient,
.on(executor, token);
}
-ExecutorFuture<void>
-ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTriggerElection(
+ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplit(
const ScopedTaskExecutorPtr& executor, const CancellationToken& abortToken) {
+
checkForTokenInterrupt(abortToken);
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) {
+ return ExecutorFuture(**executor);
+ }
+ }
+
+ LOGV2(6142501, "Waiting for recipient to accept the split.", "id"_attr = _migrationId);
+
+ return ExecutorFuture(**executor).then([&]() { return _splitAcceptancePromise.getFuture(); });
+}
+
+ExecutorFuture<void>
+ShardSplitDonorService::DonorStateMachine::_triggerElectionAndEnterCommitedState(
+ const ScopedTaskExecutorPtr& executor, const CancellationToken& primaryToken) {
+ checkForTokenInterrupt(primaryToken);
std::vector<HostAndPort> recipients;
{
@@ -699,10 +819,7 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTrig
auto remoteCommandExecutor =
_splitAcceptanceTaskExecutorForTest ? *_splitAcceptanceTaskExecutorForTest : **executor;
- LOGV2(6142501, "Waiting for recipient to accept the split.", "id"_attr = _migrationId);
-
return ExecutorFuture(**executor)
- .then([&]() { return _splitAcceptancePromise.getFuture(); })
.then([this] {
auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
if (MONGO_unlikely(pauseShardSplitBeforeLeavingBlockingState.shouldFail())) {
@@ -723,7 +840,7 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTrig
uasserted(ErrorCodes::InternalError, "simulate a shard split error");
}
})
- .then([this, recipients, abortToken, remoteCommandExecutor] {
+ .then([this, recipients, primaryToken, remoteCommandExecutor] {
LOGV2(6493901,
"Triggering an election after recipient has accepted the split.",
"id"_attr = _migrationId);
@@ -733,14 +850,16 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTrig
// succeed). Selecting a random node has a 2/3 chance to succeed for replSetStepUp. If
// the first command fail, we know this node is the most out-of-date. Therefore we
// select the next node and we know the first node selected will vote for the second.
- return sendStepUpToRecipient(recipients[0], remoteCommandExecutor, abortToken)
- .onCompletion([this, recipients, remoteCommandExecutor, abortToken](Status status) {
- if (status.isOK()) {
- return ExecutorFuture<void>(remoteCommandExecutor, status);
- }
+ return sendStepUpToRecipient(recipients[0], remoteCommandExecutor, primaryToken)
+ .onCompletion(
+ [this, recipients, remoteCommandExecutor, primaryToken](Status status) {
+ if (status.isOK()) {
+ return ExecutorFuture<void>(remoteCommandExecutor, status);
+ }
- return sendStepUpToRecipient(recipients[1], remoteCommandExecutor, abortToken);
- })
+ return sendStepUpToRecipient(
+ recipients[1], remoteCommandExecutor, primaryToken);
+ })
.onCompletion([this](Status replSetStepUpStatus) {
if (!replSetStepUpStatus.isOK()) {
LOGV2(6493904,
@@ -756,180 +875,93 @@ ShardSplitDonorService::DonorStateMachine::_waitForRecipientToAcceptSplitAndTrig
});
})
.thenRunOn(**executor)
- .then([this, executor, abortToken]() {
+ .then([this, executor, primaryToken]() {
LOGV2(6142503, "Entering 'committed' state.", "id"_attr = _stateDoc.getId());
+ auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
+ pauseShardSplitAfterUpdatingToCommittedState.pauseWhileSet(opCtx.get());
- return _updateStateDocument(executor, abortToken, ShardSplitDonorStateEnum::kCommitted)
- .then([this, executor, abortToken](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), abortToken);
+ return _updateStateDocument(
+ executor, primaryToken, ShardSplitDonorStateEnum::kCommitted)
+ .then([this, executor, primaryToken](repl::OpTime opTime) {
+ return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken);
});
});
}
-ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_enterBlockingOrAbortedState(
- const ScopedTaskExecutorPtr& executor,
- const CancellationToken& primaryToken,
- const CancellationToken& abortToken) {
- ShardSplitDonorStateEnum nextState;
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_stateDoc.getState() == ShardSplitDonorStateEnum::kAborted) {
- if (isAbortedDocumentPersistent(lg, _stateDoc)) {
- // Node has step up and created an instance using a document in abort state. No
- // need to write the document as it already exists.
- return ExecutorFuture(**executor);
- }
-
- _abortReason =
- Status(ErrorCodes::TenantMigrationAborted, "Aborted due to 'abortShardSplit'.");
- BSONObjBuilder bob;
- _abortReason->serializeErrorToBSON(&bob);
- _stateDoc.setAbortReason(bob.obj());
- _stateDoc.setExpireAt(_serviceContext->getFastClockSource()->now() +
- Milliseconds{repl::shardSplitGarbageCollectionDelayMS.load()});
- nextState = ShardSplitDonorStateEnum::kAborted;
-
- LOGV2(8423355, "Entering 'aborted' state.", "id"_attr = _stateDoc.getId());
- } else {
- auto recipientConnectionString = [stateDoc = _stateDoc]() {
- if (stateDoc.getRecipientConnectionString()) {
- return *stateDoc.getRecipientConnectionString();
- }
-
- auto recipientTagName = stateDoc.getRecipientTagName();
- invariant(recipientTagName);
- auto recipientSetName = stateDoc.getRecipientSetName();
- invariant(recipientSetName);
- auto config =
- repl::ReplicationCoordinator::get(cc().getServiceContext())->getConfig();
- return serverless::makeRecipientConnectionString(
- config, *recipientTagName, *recipientSetName);
- }();
-
- // Always start the replica set monitor if we haven't reached a decision yet
- _splitAcceptancePromise.setWith([&]() -> Future<void> {
- if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking ||
- MONGO_unlikely(skipShardSplitWaitForSplitAcceptance.shouldFail())) {
- return SemiFuture<void>::makeReady().unsafeToInlineFuture();
- }
-
- // Optionally select a task executor for unit testing
- auto executor = _splitAcceptanceTaskExecutorForTest
- ? *_splitAcceptanceTaskExecutorForTest
- : _shardSplitService->getInstanceCleanupExecutor();
-
- LOGV2(6142508,
- "Monitoring recipient nodes for split acceptance.",
- "id"_attr = _migrationId,
- "recipientConnectionString"_attr = recipientConnectionString);
-
- return detail::makeRecipientAcceptSplitFuture(
- executor, abortToken, recipientConnectionString, _migrationId)
- .unsafeToInlineFuture();
- });
-
- if (_stateDoc.getState() > ShardSplitDonorStateEnum::kUninitialized) {
- // Node has step up and resumed a shard split. No need to write the document as
- // it already exists.
- return ExecutorFuture(**executor);
- }
-
- // Otherwise, record the recipient connection string
- _stateDoc.setRecipientConnectionString(recipientConnectionString);
- _stateDoc.setState(ShardSplitDonorStateEnum::kBlocking);
- nextState = ShardSplitDonorStateEnum::kBlocking;
-
- LOGV2(8423358, "Entering 'blocking' state.", "id"_attr = _stateDoc.getId());
- }
- }
-
- return AsyncTry([this, nextState, uuid = _migrationId]() {
- auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc());
- auto opCtx = opCtxHolder.get();
-
- AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX);
-
- writeConflictRetry(
- opCtx, "ShardSplitDonorInsertStateDoc", _stateDocumentsNS.ns(), [&] {
- const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid);
- const auto getUpdatedStateDocBson = [&]() {
- stdx::lock_guard<Latch> lg(_mutex);
- return _stateDoc.toBSON();
- };
-
- WriteUnitOfWork wuow(opCtx);
- if (nextState == ShardSplitDonorStateEnum::kBlocking) {
- stdx::lock_guard<Latch> lg(_mutex);
-
- insertTenantAccessBlocker(lg, opCtx, _stateDoc);
-
- auto tenantIds = _stateDoc.getTenantIds();
- invariant(tenantIds);
- setMtabToBlockingForTenants(_serviceContext, opCtx, tenantIds.get());
- }
-
- // Reserve an opTime for the write.
- auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
- setStateDocTimestamps(
- stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc);
-
- auto updateResult = Helpers::upsert(opCtx,
- _stateDocumentsNS.ns(),
- filter,
- getUpdatedStateDocBson(),
- /*fromMigrate=*/false);
-
-
- // We only want to insert, not modify, document
- invariant(updateResult.numMatched == 0);
- wuow.commit();
- });
-
- return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
- })
- .until([](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopInsertingDonorStateDoc(swOpTime.getStatus());
- })
- .withBackoffBetweenIterations(kExponentialBackoff)
- .on(**executor, primaryToken)
- .then([this, executor, primaryToken](repl::OpTime opTime) {
- return _waitForMajorityWriteConcern(executor, std::move(opTime), primaryToken);
- })
- .then([this, executor, nextState]() {
- uassert(ErrorCodes::TenantMigrationAborted,
- "Shard split operation aborted.",
- nextState != ShardSplitDonorStateEnum::kAborted);
- });
-}
-
ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateStateDocument(
const ScopedTaskExecutorPtr& executor,
const CancellationToken& token,
ShardSplitDonorStateEnum nextState) {
- auto tenantIds = [&]() {
+ auto [tenantIds, isInsert] = [&]() {
stdx::lock_guard<Latch> lg(_mutex);
- _stateDoc.setState(nextState);
-
- return _stateDoc.getTenantIds();
+ auto isInsert = _stateDoc.getState() == ShardSplitDonorStateEnum::kUninitialized ||
+ _stateDoc.getState() == ShardSplitDonorStateEnum::kAborted;
+ return std::make_pair(_stateDoc.getTenantIds(), isInsert);
}();
- return AsyncTry([this, tenantIds = std::move(tenantIds), uuid = _migrationId, nextState] {
+ return AsyncTry([this,
+ tenantIds = std::move(tenantIds),
+ isInsert = isInsert,
+ uuid = _migrationId,
+ nextState] {
auto opCtxHolder = _cancelableOpCtxFactory->makeOperationContext(&cc());
auto opCtx = opCtxHolder.get();
AutoGetCollection collection(opCtx, _stateDocumentsNS, MODE_IX);
- uassert(ErrorCodes::NamespaceNotFound,
- str::stream() << _stateDocumentsNS.ns() << " does not exist",
- collection);
+
+ if (!isInsert) {
+ uassert(ErrorCodes::NamespaceNotFound,
+ str::stream() << _stateDocumentsNS.ns() << " does not exist",
+ collection);
+ }
writeConflictRetry(
- opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&] {
+ opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&]() {
WriteUnitOfWork wuow(opCtx);
+ if (nextState == ShardSplitDonorStateEnum::kBlocking) {
+ // Start blocking writes before getting an oplog slot to guarantee no
+ // writes to the tenant's data can commit with a timestamp after the
+ // block timestamp.
+ for (const auto& tenantId : *tenantIds) {
+ auto mtab = tenant_migration_access_blocker::
+ getTenantMigrationDonorAccessBlocker(_serviceContext, tenantId);
+ invariant(mtab);
+ mtab->startBlockingWrites();
+
+ opCtx->recoveryUnit()->onRollback(
+ [mtab] { mtab->rollBackStartBlocking(); });
+ }
+ }
+
// Reserve an opTime for the write.
auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
- setStateDocTimestamps(
- stdx::lock_guard<Latch>{_mutex}, nextState, oplogSlot, _stateDoc);
+ {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _stateDoc.setState(nextState);
+ switch (nextState) {
+ case ShardSplitDonorStateEnum::kUninitialized:
+ case ShardSplitDonorStateEnum::kAbortingIndexBuilds:
+ break;
+ case ShardSplitDonorStateEnum::kBlocking:
+ _stateDoc.setBlockTimestamp(oplogSlot.getTimestamp());
+ break;
+ case ShardSplitDonorStateEnum::kCommitted:
+ _stateDoc.setCommitOrAbortOpTime(oplogSlot);
+ break;
+ case ShardSplitDonorStateEnum::kAborted: {
+ _stateDoc.setCommitOrAbortOpTime(oplogSlot);
+
+ invariant(_abortReason);
+ BSONObjBuilder bob;
+ _abortReason.get().serializeErrorToBSON(&bob);
+ _stateDoc.setAbortReason(bob.obj());
+ break;
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
+ }
const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid);
const auto updatedStateDocBson = [&]() {
@@ -942,15 +974,19 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS
updatedStateDocBson,
/*fromMigrate=*/false);
- invariant(updateResult.numDocsModified == 1);
+ if (isInsert) {
+ invariant(!updateResult.existing);
+ invariant(!updateResult.upsertedId.isEmpty());
+ } else {
+ invariant(updateResult.numDocsModified == 1);
+ }
+
wuow.commit();
});
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})
- .until([](StatusWith<repl::OpTime> swOpTime) {
- return shouldStopInsertingDonorStateDoc(swOpTime.getStatus());
- })
+ .until([](StatusWith<repl::OpTime> swOpTime) { return swOpTime.getStatus().isOK(); })
.withBackoffBetweenIterations(kExponentialBackoff)
.on(**executor, token);
}
@@ -1148,30 +1184,4 @@ ExecutorFuture<void> ShardSplitDonorService::DonorStateMachine::_cleanRecipientS
.on(**executor, primaryToken)
.ignoreValue();
}
-
-void ShardSplitDonorService::DonorStateMachine::_abortIndexBuilds(
- const CancellationToken& abortToken) {
- checkForTokenInterrupt(abortToken);
-
- boost::optional<std::vector<StringData>> tenantIds;
- {
- stdx::lock_guard<Latch> lg(_mutex);
- if (_stateDoc.getState() > ShardSplitDonorStateEnum::kBlocking) {
- return;
- }
- tenantIds = _stateDoc.getTenantIds();
- invariant(tenantIds);
- }
-
- LOGV2(6436100, "Aborting index build for shard split.", "id"_attr = _migrationId);
-
- // Before applying the split config, abort any in-progress index builds. No new index builds
- // can start while we are doing this because the mtab prevents it.
- auto opCtx = _cancelableOpCtxFactory->makeOperationContext(&cc());
- auto* indexBuildsCoordinator = IndexBuildsCoordinator::get(opCtx.get());
- for (const auto& tenantId : *tenantIds) {
- indexBuildsCoordinator->abortTenantIndexBuilds(
- opCtx.get(), MigrationProtocolEnum::kMultitenantMigrations, tenantId, "shard split");
- }
-}
} // namespace mongo