diff options
Diffstat (limited to 'src/mongo/db/serverless/shard_split_donor_service.cpp')
-rw-r--r-- | src/mongo/db/serverless/shard_split_donor_service.cpp | 196 |
1 files changed, 96 insertions, 100 deletions
diff --git a/src/mongo/db/serverless/shard_split_donor_service.cpp b/src/mongo/db/serverless/shard_split_donor_service.cpp index 0939a556a00..abb9901fc9e 100644 --- a/src/mongo/db/serverless/shard_split_donor_service.cpp +++ b/src/mongo/db/serverless/shard_split_donor_service.cpp @@ -898,111 +898,107 @@ ExecutorFuture<repl::OpTime> ShardSplitDonorService::DonorStateMachine::_updateS collection.exists()); } - writeConflictRetry( - opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS.ns(), [&]() { - WriteUnitOfWork wuow(opCtx); - - if (nextState == ShardSplitDonorStateEnum::kBlocking) { - // Start blocking writes before getting an oplog slot to guarantee no - // writes to the tenant's data can commit with a timestamp after the - // block timestamp. - auto mtabVector = - TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) - .getDonorAccessBlockersForMigration(uuid); - invariant(!mtabVector.empty()); - - for (auto& mtab : mtabVector) { - invariant(mtab); - mtab->startBlockingWrites(); - - opCtx->recoveryUnit()->onRollback( - [mtab](OperationContext*) { mtab->rollBackStartBlocking(); }); + writeConflictRetry(opCtx, "ShardSplitDonorUpdateStateDoc", _stateDocumentsNS, [&]() { + WriteUnitOfWork wuow(opCtx); + + if (nextState == ShardSplitDonorStateEnum::kBlocking) { + // Start blocking writes before getting an oplog slot to guarantee no + // writes to the tenant's data can commit with a timestamp after the + // block timestamp. + auto mtabVector = + TenantMigrationAccessBlockerRegistry::get(opCtx->getServiceContext()) + .getDonorAccessBlockersForMigration(uuid); + invariant(!mtabVector.empty()); + + for (auto& mtab : mtabVector) { + invariant(mtab); + mtab->startBlockingWrites(); + + opCtx->recoveryUnit()->onRollback( + [mtab](OperationContext*) { mtab->rollBackStartBlocking(); }); + } + } + + // Reserve an opTime for the write. + auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; + auto updatedStateDocBson = [&]() { + stdx::lock_guard<Latch> lg(_mutex); + _stateDoc.setState(nextState); + switch (nextState) { + case ShardSplitDonorStateEnum::kUninitialized: + case ShardSplitDonorStateEnum::kAbortingIndexBuilds: + break; + case ShardSplitDonorStateEnum::kBlocking: + _stateDoc.setBlockOpTime(oplogSlot); + break; + case ShardSplitDonorStateEnum::kCommitted: + _stateDoc.setCommitOrAbortOpTime(oplogSlot); + break; + case ShardSplitDonorStateEnum::kAborted: { + _stateDoc.setCommitOrAbortOpTime(oplogSlot); + + invariant(_abortReason); + BSONObjBuilder bob; + _abortReason.value().serializeErrorToBSON(&bob); + _stateDoc.setAbortReason(bob.obj()); + break; } + default: + MONGO_UNREACHABLE; + } + if (isInsert) { + return BSON("$setOnInsert" << _stateDoc.toBSON()); } - // Reserve an opTime for the write. - auto oplogSlot = LocalOplogInfo::get(opCtx)->getNextOpTimes(opCtx, 1U)[0]; - auto updatedStateDocBson = [&]() { - stdx::lock_guard<Latch> lg(_mutex); - _stateDoc.setState(nextState); - switch (nextState) { - case ShardSplitDonorStateEnum::kUninitialized: - case ShardSplitDonorStateEnum::kAbortingIndexBuilds: - break; - case ShardSplitDonorStateEnum::kBlocking: - _stateDoc.setBlockOpTime(oplogSlot); - break; - case ShardSplitDonorStateEnum::kCommitted: - _stateDoc.setCommitOrAbortOpTime(oplogSlot); - break; - case ShardSplitDonorStateEnum::kAborted: { - _stateDoc.setCommitOrAbortOpTime(oplogSlot); - - invariant(_abortReason); - BSONObjBuilder bob; - _abortReason.value().serializeErrorToBSON(&bob); - _stateDoc.setAbortReason(bob.obj()); - break; - } - default: - MONGO_UNREACHABLE; - } - if (isInsert) { - return BSON("$setOnInsert" << _stateDoc.toBSON()); - } + return _stateDoc.toBSON(); + }(); - return _stateDoc.toBSON(); - }(); - - auto updateOpTime = [&]() { - if (isInsert) { - const auto filter = - BSON(ShardSplitDonorDocument::kIdFieldName << uuid); - auto updateResult = Helpers::upsert(opCtx, - collection, - filter, - updatedStateDocBson, - /*fromMigrate=*/false); - - // '$setOnInsert' update operator can never modify an existing - // on-disk state doc. - invariant(!updateResult.existing); - invariant(!updateResult.numDocsModified); - - return repl::ReplClientInfo::forClient(opCtx->getClient()) - .getLastOp(); - } + auto updateOpTime = [&]() { + if (isInsert) { + const auto filter = BSON(ShardSplitDonorDocument::kIdFieldName << uuid); + auto updateResult = Helpers::upsert(opCtx, + collection, + filter, + updatedStateDocBson, + /*fromMigrate=*/false); - const auto originalRecordId = - Helpers::findOne(opCtx, - collection.getCollectionPtr(), - BSON("_id" << originalStateDocBson["_id"])); - const auto originalSnapshot = Snapshotted<BSONObj>( - opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson); - invariant(!originalRecordId.isNull()); - - CollectionUpdateArgs args{originalSnapshot.value()}; - args.criteria = BSON("_id" << uuid); - args.oplogSlots = {oplogSlot}; - args.update = updatedStateDocBson; - - collection_internal::updateDocument( - opCtx, - collection.getCollectionPtr(), - originalRecordId, - originalSnapshot, - updatedStateDocBson, - collection_internal::kUpdateNoIndexes, - nullptr /* indexesAffected */, - nullptr /* OpDebug* */, - &args); - - return oplogSlot; - }(); - - wuow.commit(); - return updateOpTime; - }); + // '$setOnInsert' update operator can never modify an existing + // on-disk state doc. + invariant(!updateResult.existing); + invariant(!updateResult.numDocsModified); + + return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + } + + const auto originalRecordId = + Helpers::findOne(opCtx, + collection.getCollectionPtr(), + BSON("_id" << originalStateDocBson["_id"])); + const auto originalSnapshot = Snapshotted<BSONObj>( + opCtx->recoveryUnit()->getSnapshotId(), originalStateDocBson); + invariant(!originalRecordId.isNull()); + + CollectionUpdateArgs args{originalSnapshot.value()}; + args.criteria = BSON("_id" << uuid); + args.oplogSlots = {oplogSlot}; + args.update = updatedStateDocBson; + + collection_internal::updateDocument(opCtx, + collection.getCollectionPtr(), + originalRecordId, + originalSnapshot, + updatedStateDocBson, + collection_internal::kUpdateNoIndexes, + nullptr /* indexesAffected */, + nullptr /* OpDebug* */, + &args); + + return oplogSlot; + }(); + + wuow.commit(); + return updateOpTime; + }); return repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); }) |