summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer/op_observer_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer/op_observer_impl.cpp193
1 files changed, 96 insertions, 97 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl.cpp b/src/mongo/db/op_observer/op_observer_impl.cpp
index 6f4bf660c26..e3ed6a0159c 100644
--- a/src/mongo/db/op_observer/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl.cpp
@@ -1737,7 +1737,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
invariant(!opCtx->lockState()->hasMaxLockTimeout());
writeConflictRetry(
- opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ opCtx, "onPreparedTransactionCommitOrAbort", NamespaceString::kRsOplogNamespace, [&] {
// Writes to the oplog only require a Global intent lock. Guaranteed by
// OplogSlotReserver.
invariant(opCtx->lockState()->isWriteLocked());
@@ -2059,107 +2059,106 @@ void OpObserverImpl::onTransactionPrepare(
invariant(reservedSlots.size() >= statements.size());
TransactionParticipant::SideTransactionBlock sideTxn(opCtx);
- writeConflictRetry(
- opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace.ns(), [&] {
+ writeConflictRetry(opCtx, "onTransactionPrepare", NamespaceString::kRsOplogNamespace, [&] {
+ // Writes to the oplog only require a Global intent lock. Guaranteed by
+ // OplogSlotReserver.
+ invariant(opCtx->lockState()->isWriteLocked());
+
+ WriteUnitOfWork wuow(opCtx);
+ // It is possible that the transaction resulted in no changes, In that case, we
+ // should not write any operations other than the prepare oplog entry.
+ if (!statements.empty()) {
+ // Storage transaction commit is the last place inside a transaction that can
+ // throw an exception. In order to safely allow exceptions to be thrown at that
+ // point, this function must be called from an outer WriteUnitOfWork in order to
+ // be rolled back upon reaching the exception.
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
// Writes to the oplog only require a Global intent lock. Guaranteed by
// OplogSlotReserver.
invariant(opCtx->lockState()->isWriteLocked());
- WriteUnitOfWork wuow(opCtx);
- // It is possible that the transaction resulted in no changes, In that case, we
- // should not write any operations other than the prepare oplog entry.
- if (!statements.empty()) {
- // Storage transaction commit is the last place inside a transaction that can
- // throw an exception. In order to safely allow exceptions to be thrown at that
- // point, this function must be called from an outer WriteUnitOfWork in order to
- // be rolled back upon reaching the exception.
- invariant(opCtx->lockState()->inAWriteUnitOfWork());
-
- // Writes to the oplog only require a Global intent lock. Guaranteed by
- // OplogSlotReserver.
- invariant(opCtx->lockState()->isWriteLocked());
-
- if (applyOpsOperationAssignment.applyOpsEntries.size() > 1U) {
- // Partial transactions create/reserve multiple oplog entries in the same
- // WriteUnitOfWork. Because of this, such transactions will set multiple
- // timestamps, violating the multi timestamp constraint. It's safe to ignore
- // the multi timestamp constraints here as additional rollback logic is in
- // place for this case. See SERVER-48771.
- opCtx->recoveryUnit()->ignoreAllMultiTimestampConstraints();
- }
-
- // This is set for every oplog entry, except for the last one, in the applyOps
- // chain of an unprepared multi-doc transaction.
- // For a single prepare oplog entry, choose the last oplog slot for the first
- // optime of the transaction. The first optime corresponds to the 'startOpTime'
- // field in SessionTxnRecord that is persisted in config.transactions.
- // See SERVER-40678.
- auto startOpTime = applyOpsOperationAssignment.applyOpsEntries.size() == 1U
- ? reservedSlots.back()
- : reservedSlots.front();
-
- auto logApplyOpsForPreparedTransaction =
- [opCtx, oplogWriter = _oplogWriter.get(), startOpTime](
- repl::MutableOplogEntry* oplogEntry,
- bool firstOp,
- bool lastOp,
- std::vector<StmtId> stmtIdsWritten) {
- return logApplyOps(opCtx,
- oplogEntry,
- /*txnState=*/
- (lastOp ? DurableTxnStateEnum::kPrepared
- : DurableTxnStateEnum::kInProgress),
- startOpTime,
- std::move(stmtIdsWritten),
- /*updateTxnTable=*/(firstOp || lastOp),
- oplogWriter);
- };
-
- // We had reserved enough oplog slots for the worst case where each operation
- // produced one oplog entry. When operations are smaller and can be packed, we
- // will waste the extra slots. The implicit prepare oplog entry will still use
- // the last reserved slot, because the transaction participant has already used
- // that as the prepare time.
- boost::optional<repl::ReplOperation::ImageBundle> imageToWrite;
- invariant(applyOpsOperationAssignment.prepare);
- (void)transactionOperations.logOplogEntries(reservedSlots,
- applyOpsOperationAssignment,
- wallClockTime,
- logApplyOpsForPreparedTransaction,
- &imageToWrite);
- if (imageToWrite) {
- writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), *imageToWrite);
- }
- } else {
- // Log an empty 'prepare' oplog entry.
- // We need to have at least one reserved slot.
- invariant(reservedSlots.size() > 0);
- BSONObjBuilder applyOpsBuilder;
- BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
- opsArray.done();
- applyOpsBuilder.append("prepare", true);
-
- auto oplogSlot = reservedSlots.front();
- MutableOplogEntry oplogEntry;
- oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
- oplogEntry.setNss(NamespaceString::kAdminCommandNamespace);
- oplogEntry.setOpTime(oplogSlot);
- oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
- oplogEntry.setObject(applyOpsBuilder.done());
- oplogEntry.setWallClockTime(wallClockTime);
-
- // TODO SERVER-69286: set the top-level tenantId here
-
- logApplyOps(opCtx,
- &oplogEntry,
- DurableTxnStateEnum::kPrepared,
- /*startOpTime=*/oplogSlot,
- /*stmtIdsWritten=*/{},
- /*updateTxnTable=*/true,
- _oplogWriter.get());
+ if (applyOpsOperationAssignment.applyOpsEntries.size() > 1U) {
+ // Partial transactions create/reserve multiple oplog entries in the same
+ // WriteUnitOfWork. Because of this, such transactions will set multiple
+ // timestamps, violating the multi timestamp constraint. It's safe to ignore
+ // the multi timestamp constraints here as additional rollback logic is in
+ // place for this case. See SERVER-48771.
+ opCtx->recoveryUnit()->ignoreAllMultiTimestampConstraints();
+ }
+
+ // This is set for every oplog entry, except for the last one, in the applyOps
+ // chain of an unprepared multi-doc transaction.
+ // For a single prepare oplog entry, choose the last oplog slot for the first
+ // optime of the transaction. The first optime corresponds to the 'startOpTime'
+ // field in SessionTxnRecord that is persisted in config.transactions.
+ // See SERVER-40678.
+ auto startOpTime = applyOpsOperationAssignment.applyOpsEntries.size() == 1U
+ ? reservedSlots.back()
+ : reservedSlots.front();
+
+ auto logApplyOpsForPreparedTransaction =
+ [opCtx, oplogWriter = _oplogWriter.get(), startOpTime](
+ repl::MutableOplogEntry* oplogEntry,
+ bool firstOp,
+ bool lastOp,
+ std::vector<StmtId> stmtIdsWritten) {
+ return logApplyOps(opCtx,
+ oplogEntry,
+ /*txnState=*/
+ (lastOp ? DurableTxnStateEnum::kPrepared
+ : DurableTxnStateEnum::kInProgress),
+ startOpTime,
+ std::move(stmtIdsWritten),
+ /*updateTxnTable=*/(firstOp || lastOp),
+ oplogWriter);
+ };
+
+ // We had reserved enough oplog slots for the worst case where each operation
+ // produced one oplog entry. When operations are smaller and can be packed, we
+ // will waste the extra slots. The implicit prepare oplog entry will still use
+ // the last reserved slot, because the transaction participant has already used
+ // that as the prepare time.
+ boost::optional<repl::ReplOperation::ImageBundle> imageToWrite;
+ invariant(applyOpsOperationAssignment.prepare);
+ (void)transactionOperations.logOplogEntries(reservedSlots,
+ applyOpsOperationAssignment,
+ wallClockTime,
+ logApplyOpsForPreparedTransaction,
+ &imageToWrite);
+ if (imageToWrite) {
+ writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), *imageToWrite);
}
- wuow.commit();
- });
+ } else {
+ // Log an empty 'prepare' oplog entry.
+ // We need to have at least one reserved slot.
+ invariant(reservedSlots.size() > 0);
+ BSONObjBuilder applyOpsBuilder;
+ BSONArrayBuilder opsArray(applyOpsBuilder.subarrayStart("applyOps"_sd));
+ opsArray.done();
+ applyOpsBuilder.append("prepare", true);
+
+ auto oplogSlot = reservedSlots.front();
+ MutableOplogEntry oplogEntry;
+ oplogEntry.setOpType(repl::OpTypeEnum::kCommand);
+ oplogEntry.setNss(NamespaceString::kAdminCommandNamespace);
+ oplogEntry.setOpTime(oplogSlot);
+ oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
+ oplogEntry.setObject(applyOpsBuilder.done());
+ oplogEntry.setWallClockTime(wallClockTime);
+
+ // TODO SERVER-69286: set the top-level tenantId here
+
+ logApplyOps(opCtx,
+ &oplogEntry,
+ DurableTxnStateEnum::kPrepared,
+ /*startOpTime=*/oplogSlot,
+ /*stmtIdsWritten=*/{},
+ /*updateTxnTable=*/true,
+ _oplogWriter.get());
+ }
+ wuow.commit();
+ });
}
shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, statements, prepareOpTime);