diff options
Diffstat (limited to 'src/mongo/db/op_observer/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer/op_observer_impl.cpp | 193 |
1 files changed, 96 insertions, 97 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp index 6f4bf660c26..e3ed6a0159c 100644 --- a/src/mongo/db/op_observer/op_observer_impl.cpp +++ b/src/mongo/db/op_observer/op_observer_impl.cpp @@ -1737,7 +1737,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, invariant(!opCtx->lockState()->hasMaxLockTimeout()); writeConflictRetry( - opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] { + opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace, [&] { // Writes to the oplog only require a Global intent lock. Guaranteed by // OplogSlotReserver. invariant(opCtx->lockState()->isWriteLocked()); @@ -2059,107 +2059,106 @@ void OpObserverImpl::onTransactionPrepare( invariant(reservedSlots.size() >= statements.size()); TransactionParticipant::SideTransactionBlock sideTxn(opCtx); - writeConflictRetry( - opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] { + writeConflictRetry(opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace, [&] { + // Writes to the oplog only require a Global intent lock. Guaranteed by + // OplogSlotReserver. + invariant(opCtx->lockState()->isWriteLocked()); + + WriteUnitOfWork wuow(opCtx); + // It is possible that the transaction resulted in no changes, In that case, we + // should not write any operations other than the prepare oplog entry. + if (!statements.empty()) { + // Storage transaction commit is the last place inside a transaction that can + // throw an exception. In order to safely allow exceptions to be thrown at that + // point, this function must be called from an outer WriteUnitOfWork in order to + // be rolled back upon reaching the exception. + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + // Writes to the oplog only require a Global intent lock. Guaranteed by // OplogSlotReserver. invariant(opCtx->lockState()->isWriteLocked()); - WriteUnitOfWork wuow(opCtx); - // It is possible that the transaction resulted in no changes, In that case, we - // should not write any operations other than the prepare oplog entry. - if (!statements.empty()) { - // Storage transaction commit is the last place inside a transaction that can - // throw an exception. In order to safely allow exceptions to be thrown at that - // point, this function must be called from an outer WriteUnitOfWork in order to - // be rolled back upon reaching the exception. - invariant(opCtx->lockState()->inAWriteUnitOfWork()); - - // Writes to the oplog only require a Global intent lock. Guaranteed by - // OplogSlotReserver. - invariant(opCtx->lockState()->isWriteLocked()); - - if (applyOpsOperationAssignment.applyOpsEntries.size() > 1U) { - // Partial transactions create/reserve multiple oplog entries in the same - // WriteUnitOfWork. Because of this, such transactions will set multiple - // timestamps, violating the multi timestamp constraint. It's safe to ignore - // the multi timestamp constraints here as additional rollback logic is in - // place for this case. See SERVER-48771. - opCtx->recoveryUnit()->ignoreAllMultiTimestampConstraints(); - } - - // This is set for every oplog entry, except for the last one, in the applyOps - // chain of an unprepared multi-doc transaction. - // For a single prepare oplog entry, choose the last oplog slot for the first - // optime of the transaction. The first optime corresponds to the 'startOpTime' - // field in SessionTxnRecord that is persisted in config.transactions. - // See SERVER-40678. - auto startOpTime = applyOpsOperationAssignment.applyOpsEntries.size() == 1U - ? reservedSlots.back() - : reservedSlots.front(); - - auto logApplyOpsForPreparedTransaction = - [opCtx, oplogWriter = _oplogWriter.get(), startOpTime]( - repl::MutableOplogEntry* oplogEntry, - bool firstOp, - bool lastOp, - std::vector<StmtId> stmtIdsWritten) { - return logApplyOps(opCtx, - oplogEntry, - /*txnState=*/ - (lastOp ? DurableTxnStateEnum::kPrepared - : DurableTxnStateEnum::kInProgress), - startOpTime, - std::move(stmtIdsWritten), - /*updateTxnTable=*/(firstOp || lastOp), - oplogWriter); - }; - - // We had reserved enough oplog slots for the worst case where each operation - // produced one oplog entry. When operations are smaller and can be packed, we - // will waste the extra slots. The implicit prepare oplog entry will still use - // the last reserved slot, because the transaction participant has already used - // that as the prepare time. - boost::optional<repl::ReplOperation::ImageBundle> imageToWrite; - invariant(applyOpsOperationAssignment.prepare); - (void)transactionOperations.logOplogEntries(reservedSlots, - applyOpsOperationAssignment, - wallClockTime, - logApplyOpsForPreparedTransaction, - &imageToWrite); - if (imageToWrite) { - writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), *imageToWrite); - } - } else { - // Log an empty 'prepare' oplog entry. - // We need to have at least one reserved slot. - invariant(reservedSlots.size() > 0); - BSONObjBuilder applyOpsBuilder; - BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); - opsArray.done(); - applyOpsBuilder.append("prepare", true); - - auto oplogSlot = reservedSlots.front(); - MutableOplogEntry oplogEntry; - oplogEntry.setOpType(repl::OpTypeEnum::kCommand); - oplogEntry.setNss(NamespaceString::kAdminCommandNamespace); - oplogEntry.setOpTime(oplogSlot); - oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); - oplogEntry.setObject(applyOpsBuilder.done()); - oplogEntry.setWallClockTime(wallClockTime); - - // TODO SERVER-69286: set the top-level tenantId here - - logApplyOps(opCtx, - &oplogEntry, - DurableTxnStateEnum::kPrepared, - /*startOpTime=*/oplogSlot, - /*stmtIdsWritten=*/{}, - /*updateTxnTable=*/true, - _oplogWriter.get()); + if (applyOpsOperationAssignment.applyOpsEntries.size() > 1U) { + // Partial transactions create/reserve multiple oplog entries in the same + // WriteUnitOfWork. Because of this, such transactions will set multiple + // timestamps, violating the multi timestamp constraint. It's safe to ignore + // the multi timestamp constraints here as additional rollback logic is in + // place for this case. See SERVER-48771. + opCtx->recoveryUnit()->ignoreAllMultiTimestampConstraints(); + } + + // This is set for every oplog entry, except for the last one, in the applyOps + // chain of an unprepared multi-doc transaction. + // For a single prepare oplog entry, choose the last oplog slot for the first + // optime of the transaction. The first optime corresponds to the 'startOpTime' + // field in SessionTxnRecord that is persisted in config.transactions. + // See SERVER-40678. + auto startOpTime = applyOpsOperationAssignment.applyOpsEntries.size() == 1U + ? reservedSlots.back() + : reservedSlots.front(); + + auto logApplyOpsForPreparedTransaction = + [opCtx, oplogWriter = _oplogWriter.get(), startOpTime]( + repl::MutableOplogEntry* oplogEntry, + bool firstOp, + bool lastOp, + std::vector<StmtId> stmtIdsWritten) { + return logApplyOps(opCtx, + oplogEntry, + /*txnState=*/ + (lastOp ? DurableTxnStateEnum::kPrepared + : DurableTxnStateEnum::kInProgress), + startOpTime, + std::move(stmtIdsWritten), + /*updateTxnTable=*/(firstOp || lastOp), + oplogWriter); + }; + + // We had reserved enough oplog slots for the worst case where each operation + // produced one oplog entry. When operations are smaller and can be packed, we + // will waste the extra slots. The implicit prepare oplog entry will still use + // the last reserved slot, because the transaction participant has already used + // that as the prepare time. + boost::optional<repl::ReplOperation::ImageBundle> imageToWrite; + invariant(applyOpsOperationAssignment.prepare); + (void)transactionOperations.logOplogEntries(reservedSlots, + applyOpsOperationAssignment, + wallClockTime, + logApplyOpsForPreparedTransaction, + &imageToWrite); + if (imageToWrite) { + writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), *imageToWrite); } - wuow.commit(); - }); + } else { + // Log an empty 'prepare' oplog entry. + // We need to have at least one reserved slot. + invariant(reservedSlots.size() > 0); + BSONObjBuilder applyOpsBuilder; + BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd)); + opsArray.done(); + applyOpsBuilder.append("prepare", true); + + auto oplogSlot = reservedSlots.front(); + MutableOplogEntry oplogEntry; + oplogEntry.setOpType(repl::OpTypeEnum::kCommand); + oplogEntry.setNss(NamespaceString::kAdminCommandNamespace); + oplogEntry.setOpTime(oplogSlot); + oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); + oplogEntry.setObject(applyOpsBuilder.done()); + oplogEntry.setWallClockTime(wallClockTime); + + // TODO SERVER-69286: set the top-level tenantId here + + logApplyOps(opCtx, + &oplogEntry, + DurableTxnStateEnum::kPrepared, + /*startOpTime=*/oplogSlot, + /*stmtIdsWritten=*/{}, + /*updateTxnTable=*/true, + _oplogWriter.get()); + } + wuow.commit(); + }); } shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, prepareOpTime); |