summaryrefslogtreecommitdiff
path: root/src/mongo/db/serverless/shard_split_donor_service.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/serverless/shard_split_donor_service.cpp')
-rw-r--r--src/mongo/db/serverless/shard_split_donor_service.cpp196
1 files changed, 96 insertions, 100 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp
index 0939a556a00..abb9901fc9e 100644
--- a/src/mongo/db/serverless/shard_split_donor_service.cpp
+++ b/src/mongo/db/serverless/shard_split_donor_service.cpp
@@ -898,111 +898,107 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS
collection.exists());
}
- writeConflictRetry(
- opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&]() {
- WriteUnitOfWork wuow(opCtx);
-
- if (nextState == ShardSplitDonorStateEnum::kBlocking) {
- // Start blocking writes before getting an oplog slot to guarantee no
- // writes to the tenant's data can commit with a timestamp after the
- // block timestamp.
- auto mtabVector =
- TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
- .getDonorAccessBlockersForMigration(uuid);
- invariant(!mtabVector.empty());
-
- for (auto& mtab : mtabVector) {
- invariant(mtab);
- mtab->startBlockingWrites();
-
- opCtx->recoveryUnit()->onRollback(
- [mtab](OperationContext*) { mtab->rollBackStartBlocking(); });
+ writeConflictRetry(opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS, [&]() {
+ WriteUnitOfWork wuow(opCtx);
+
+ if (nextState == ShardSplitDonorStateEnum::kBlocking) {
+ // Start blocking writes before getting an oplog slot to guarantee no
+ // writes to the tenant's data can commit with a timestamp after the
+ // block timestamp.
+ auto mtabVector =
+ TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext())
+ .getDonorAccessBlockersForMigration(uuid);
+ invariant(!mtabVector.empty());
+
+ for (auto& mtab : mtabVector) {
+ invariant(mtab);
+ mtab->startBlockingWrites();
+
+ opCtx->recoveryUnit()->onRollback(
+ [mtab](OperationContext*) { mtab->rollBackStartBlocking(); });
+ }
+ }
+
+ // Reserve an opTime for the write.
+ auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
+ auto updatedStateDocBson = [&]() {
+ stdx::lock_guard<Latch> lg(_mutex);
+ _stateDoc.setState(nextState);
+ switch (nextState) {
+ case ShardSplitDonorStateEnum::kUninitialized:
+ case ShardSplitDonorStateEnum::kAbortingIndexBuilds:
+ break;
+ case ShardSplitDonorStateEnum::kBlocking:
+ _stateDoc.setBlockOpTime(oplogSlot);
+ break;
+ case ShardSplitDonorStateEnum::kCommitted:
+ _stateDoc.setCommitOrAbortOpTime(oplogSlot);
+ break;
+ case ShardSplitDonorStateEnum::kAborted: {
+ _stateDoc.setCommitOrAbortOpTime(oplogSlot);
+
+ invariant(_abortReason);
+ BSONObjBuilder bob;
+ _abortReason.value().serializeErrorToBSON(&bob);
+ _stateDoc.setAbortReason(bob.obj());
+ break;
}
+ default:
+ MONGO_UNREACHABLE;
+ }
+ if (isInsert) {
+ return BSON("$setOnInsert" << _stateDoc.toBSON());
}
- // Reserve an opTime for the write.
- auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0];
- auto updatedStateDocBson = [&]() {
- stdx::lock_guard<Latch> lg(_mutex);
- _stateDoc.setState(nextState);
- switch (nextState) {
- case ShardSplitDonorStateEnum::kUninitialized:
- case ShardSplitDonorStateEnum::kAbortingIndexBuilds:
- break;
- case ShardSplitDonorStateEnum::kBlocking:
- _stateDoc.setBlockOpTime(oplogSlot);
- break;
- case ShardSplitDonorStateEnum::kCommitted:
- _stateDoc.setCommitOrAbortOpTime(oplogSlot);
- break;
- case ShardSplitDonorStateEnum::kAborted: {
- _stateDoc.setCommitOrAbortOpTime(oplogSlot);
-
- invariant(_abortReason);
- BSONObjBuilder bob;
- _abortReason.value().serializeErrorToBSON(&bob);
- _stateDoc.setAbortReason(bob.obj());
- break;
- }
- default:
- MONGO_UNREACHABLE;
- }
- if (isInsert) {
- return BSON("$setOnInsert" << _stateDoc.toBSON());
- }
+ return _stateDoc.toBSON();
+ }();
- return _stateDoc.toBSON();
- }();
-
- auto updateOpTime = [&]() {
- if (isInsert) {
- const auto filter =
- BSON(ShardSplitDonorDocument::kIdFieldName << uuid);
- auto updateResult = Helpers::upsert(opCtx,
- collection,
- filter,
- updatedStateDocBson,
- /*fromMigrate=*/false);
-
- // '$setOnInsert' update operator can never modify an existing
- // on-disk state doc.
- invariant(!updateResult.existing);
- invariant(!updateResult.numDocsModified);
-
- return repl::ReplClientInfo::forClient(opCtx->getClient())
- .getLastOp();
- }
+ auto updateOpTime = [&]() {
+ if (isInsert) {
+ const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid);
+ auto updateResult = Helpers::upsert(opCtx,
+ collection,
+ filter,
+ updatedStateDocBson,
+ /*fromMigrate=*/false);
- const auto originalRecordId =
- Helpers::findOne(opCtx,
- collection.getCollectionPtr(),
- BSON("_id" << originalStateDocBson["_id"]));
- const auto originalSnapshot = Snapshotted<BSONObj>(
- opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson);
- invariant(!originalRecordId.isNull());
-
- CollectionUpdateArgs args{originalSnapshot.value()};
- args.criteria = BSON("_id" << uuid);
- args.oplogSlots = {oplogSlot};
- args.update = updatedStateDocBson;
-
- collection_internal::updateDocument(
- opCtx,
- collection.getCollectionPtr(),
- originalRecordId,
- originalSnapshot,
- updatedStateDocBson,
- collection_internal::kUpdateNoIndexes,
- nullptr /* indexesAffected */,
- nullptr /* OpDebug* */,
- &args);
-
- return oplogSlot;
- }();
-
- wuow.commit();
- return updateOpTime;
- });
+ // '$setOnInsert' update operator can never modify an existing
+ // on-disk state doc.
+ invariant(!updateResult.existing);
+ invariant(!updateResult.numDocsModified);
+
+ return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ }
+
+ const auto originalRecordId =
+ Helpers::findOne(opCtx,
+ collection.getCollectionPtr(),
+ BSON("_id" << originalStateDocBson["_id"]));
+ const auto originalSnapshot = Snapshotted<BSONObj>(
+ opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson);
+ invariant(!originalRecordId.isNull());
+
+ CollectionUpdateArgs args{originalSnapshot.value()};
+ args.criteria = BSON("_id" << uuid);
+ args.oplogSlots = {oplogSlot};
+ args.update = updatedStateDocBson;
+
+ collection_internal::updateDocument(opCtx,
+ collection.getCollectionPtr(),
+ originalRecordId,
+ originalSnapshot,
+ updatedStateDocBson,
+ collection_internal::kUpdateNoIndexes,
+ nullptr /* indexesAffected */,
+ nullptr /* OpDebug* */,
+ &args);
+
+ return oplogSlot;
+ }();
+
+ wuow.commit();
+ return updateOpTime;
+ });
return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
})