diff options
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 315 |
1 files changed, 254 insertions, 61 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index dda43be72ee..fac3cd11e1e 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -33,6 +33,7 @@ #include "mongo/db/op_observer_impl.h" +#include <algorithm> #include <limits> #include "mongo/bson/bsonobjbuilder.h" @@ -102,8 +103,12 @@ Date_t getWallClockTimeForOpLog(OperationContext* opCtx) { return clockSource->now(); } -repl::OpTime logOperation(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { - oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx)); +repl::OpTime logOperation(OperationContext* opCtx, + MutableOplogEntry* oplogEntry, + bool assignWallClockTime = true) { + if (assignWallClockTime) { + oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx)); + } auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; auto opTime = repl::logOp(opCtx, oplogEntry); times.push_back(opTime); @@ -1320,8 +1325,8 @@ namespace { */ void writeChangeStreamPreImagesForApplyOpsEntries( OperationContext* opCtx, - const std::vector<repl::ReplOperation>::iterator& stmtBegin, - const std::vector<repl::ReplOperation>::iterator& stmtEnd, + std::vector<repl::ReplOperation>::const_iterator stmtBegin, + std::vector<repl::ReplOperation>::const_iterator stmtEnd, Timestamp applyOpsTimestamp, Date_t operationTime) { int64_t applyOpsIndex{0}; @@ -1342,20 +1347,175 @@ void writeChangeStreamPreImagesForApplyOpsEntries( } } +/** + * Returns transaction operations that can fit into an "applyOps" entry. The returned operations are + * serialized to BSON. The transaction operations are given by range ['operationsBegin', + * 'operationsEnd'). Constraints for fitting the operations: (1) the resulting "applyOps" entry + * shouldn't exceed the 16MB limit, unless only one operation is allocated to it; (2) the number of + * operations is not larger than the maximum number of transaction statements allowed in one entry + * as defined by 'gMaxNumberOfTransactionOperationsInSingleOplogEntry'. + */ +std::vector<BSONObj> packTransactionOperationsIntoApplyOps( + std::vector<repl::ReplOperation>::const_iterator operationsBegin, + std::vector<repl::ReplOperation>::const_iterator operationsEnd) { + tassert(6278503, + "gMaxNumberOfTransactionOperationsInSingleOplogEntry should be positive number", + gMaxNumberOfTransactionOperationsInSingleOplogEntry > 0); + std::vector<BSONObj> operations; + size_t totalOperationsSize{0}; + for (auto operationIter = operationsBegin; operationIter != operationsEnd; ++operationIter) { + const auto& operation = *operationIter; + + // Stop packing when either number of transaction operations is reached, or when the next + // one would make the total size of operations larger than the maximum BSON Object User + // Size. We rely on the headroom between BSONObjMaxUserSize and BSONObjMaxInternalSize to + // cover the BSON overhead and the other "applyOps" entry fields. But if a single operation + // in the set exceeds BSONObjMaxUserSize, we still fit it, as a single max-length operation + // should be able to be packed into an "applyOps" entry. + if (operations.size() == + static_cast<size_t>(gMaxNumberOfTransactionOperationsInSingleOplogEntry) || + (operations.size() > 0 && + (totalOperationsSize + DurableOplogEntry::getDurableReplOperationSize(operation) > + BSONObjMaxUserSize))) { + break; + } + auto serializedOperation = operation.toBSON(); + totalOperationsSize += static_cast<size_t>(serializedOperation.objsize()); + operations.emplace_back(std::move(serializedOperation)); + } + return operations; +} + +/** + * Returns oplog slots to be used for "applyOps" oplog entries, BSON serialized operations, their + * assignments to "applyOps" entries, and oplog slots to be used for writing pre- and post- image + * oplog entries for the transaction consisting of 'operations'. Allocates oplog slots from + * 'oplogSlots'. The 'numberOfPrePostImagesToWrite' is the number of CRUD operations that have a + * pre-image to write as a noop oplog entry. The 'prepare' indicates if the function is called when + * preparing a transaction. + */ +OpObserver::ApplyOpsOplogSlotAndOperationAssignment +getApplyOpsOplogSlotAndOperationAssignmentForTransaction( + OperationContext* opCtx, + const std::vector<OplogSlot>& oplogSlots, + size_t numberOfPrePostImagesToWrite, + bool prepare, + std::vector<repl::ReplOperation>& operations) { + if (operations.empty()) { + return {{}, {}, 0 /*numberOfOplogSlotsUsed*/}; + } + tassert(6278504, "Insufficient number of oplogSlots", operations.size() <= oplogSlots.size()); + + std::vector<OplogSlot> prePostImageOplogEntryOplogSlots; + std::vector<OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry> applyOpsEntries; + const auto operationCount = operations.size(); + auto oplogSlotIter = oplogSlots.begin(); + auto getNextOplogSlot = [&]() { + tassert(6278505, "Unexpected end of oplog slot vector", oplogSlotIter != oplogSlots.end()); + return *oplogSlotIter++; + }; + + auto isMigratingTenant = [&opCtx]() { + return static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx)); + }; + + // We never want to store pre-images or post-images when we're migrating oplog entries from + // another replica set. + if (numberOfPrePostImagesToWrite > 0 && !isMigratingTenant()) { + for (size_t operationIdx = 0; operationIdx < operationCount; ++operationIdx) { + auto& statement = operations[operationIdx]; + if (statement.isChangeStreamPreImageRecordedInOplog() || + (statement.isPreImageRecordedForRetryableInternalTransaction() && + statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) { + tassert(6278506, "Expected a pre-image", !statement.getPreImage().isEmpty()); + auto oplogSlot = getNextOplogSlot(); + prePostImageOplogEntryOplogSlots.push_back(oplogSlot); + statement.setPreImageOpTime(oplogSlot); + } + if (!statement.getPostImage().isEmpty() && + statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) { + auto oplogSlot = getNextOplogSlot(); + prePostImageOplogEntryOplogSlots.push_back(oplogSlot); + statement.setPostImageOpTime(oplogSlot); + } + } + } + + auto hasNeedsRetryImage = [](const repl::ReplOperation& operation) { + return static_cast<bool>(operation.getNeedsRetryImage()); + }; + + // Assign operations to "applyOps" entries. + for (auto operationIt = operations.begin(); operationIt != operations.end();) { + auto applyOpsOperations = + packTransactionOperationsIntoApplyOps(operationIt, operations.end()); + const auto opCountWithNeedsRetryImage = + std::count_if(operationIt, operationIt + applyOpsOperations.size(), hasNeedsRetryImage); + if (opCountWithNeedsRetryImage > 0) { + // Reserve a slot for a forged no-op entry. + getNextOplogSlot(); + } + operationIt += applyOpsOperations.size(); + applyOpsEntries.emplace_back( + OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry{ + getNextOplogSlot(), std::move(applyOpsOperations)}); + } + + // In the special case of writing the implicit 'prepare' oplog entry, we use the last reserved + // oplog slot. This may mean we skipped over some reserved slots, but there's no harm in that. + if (prepare) { + applyOpsEntries.back().oplogSlot = oplogSlots.back(); + } + return {std::move(prePostImageOplogEntryOplogSlots), + std::move(applyOpsEntries), + static_cast<size_t>(oplogSlotIter - oplogSlots.begin())}; +} + +/** + * Writes change stream pre-images for transaction 'operations'. The 'applyOpsOperationAssignment' + * contains a representation of "applyOps" entries to be written for the transaction. The + * 'operationTime' is wall clock time of the operations used for the pre-image documents. + */ +void writeChangeStreamPreImagesForTransaction( + OperationContext* opCtx, + const std::vector<repl::ReplOperation>& operations, + const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment, + Date_t operationTime) { + // This function must be called from an outer WriteUnitOfWork in order to be rolled back upon + // reaching the exception. + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + + auto applyOpsEntriesIt = applyOpsOperationAssignment.applyOpsEntries.begin(); + for (auto operationIter = operations.begin(); operationIter != operations.end();) { + tassert(6278507, + "Unexpected end of applyOps entries vector", + applyOpsEntriesIt != applyOpsOperationAssignment.applyOpsEntries.end()); + const auto& applyOpsEntry = *applyOpsEntriesIt++; + const auto operationSequenceEnd = operationIter + applyOpsEntry.operations.size(); + writeChangeStreamPreImagesForApplyOpsEntries(opCtx, + operationIter, + operationSequenceEnd, + applyOpsEntry.oplogSlot.getTimestamp(), + operationTime); + operationIter = operationSequenceEnd; + } +} + // Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array -// field. Appends as many operations as possible to the array (and their corresponding statement -// ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the -// maximum number of transaction statements allowed in one entry. If any of the statements has -// a pre-image or post-image that needs to be stored in the image collection, stores it to -// 'imageToWrite'. -// -// Returns an iterator to the first statement that wasn't packed into the applyOps object. -std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( +// field (and their corresponding statement ids to 'stmtIdsWritten'). The transaction statements are +// represented as range ['stmtBegin', 'stmtEnd') and BSON serialized objects 'operations'. If any of +// the statements has a pre-image or post-image that needs to be stored in the image collection, +// stores it to 'imageToWrite'. +void packTransactionStatementsForApplyOps( BSONObjBuilder* applyOpsBuilder, std::vector<StmtId>* stmtIdsWritten, boost::optional<std::pair<repl::RetryImageEnum, BSONObj>>* imageToWrite, std::vector<repl::ReplOperation>::iterator stmtBegin, - std::vector<repl::ReplOperation>::iterator stmtEnd) { + std::vector<repl::ReplOperation>::iterator stmtEnd, + const std::vector<BSONObj>& operations) { + tassert(6278508, + "Number of operations does not match the number of transaction statements", + operations.size() == static_cast<size_t>(stmtEnd - stmtBegin)); auto setImageToWrite = [&](const repl::ReplOperation& stmt) { uassert(6054001, str::stream() << NamespaceString::kConfigImagesNamespace @@ -1381,21 +1541,11 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( }; std::vector<repl::ReplOperation>::iterator stmtIter; + auto operationsIter = operations.begin(); BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd)); for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) { const auto& stmt = *stmtIter; - // Stop packing when either number of transaction operations is reached, or when the next - // one would put the array over the maximum BSON Object User Size. We rely on the head room - // between BSONObjMaxUserSize and BSONObjMaxInternalSize to cover the BSON overhead and the - // other applyOps fields. But if the array with a single operation exceeds - // BSONObjMaxUserSize, we still log it, as a single max-length operation should be able to - // be applied. - if (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry || - (opsArray.arrSize() > 0 && - (opsArray.len() + DurableOplogEntry::getDurableReplOperationSize(stmt) > - BSONObjMaxUserSize))) - break; - opsArray.append(stmt.toBSON()); + opsArray.append(*operationsIter++); const auto stmtIds = stmt.getStatementIds(); stmtIdsWritten->insert(stmtIdsWritten->end(), stmtIds.begin(), stmtIds.end()); if (stmt.getNeedsRetryImage()) { @@ -1413,7 +1563,6 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( e.code() != ErrorCodes::BSONObjectTooLarge); throw; } - return stmtIter; } // Logs one applyOps entry and may update the transactions table. Assumes that the given BSON @@ -1449,7 +1598,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, try { OpTimeBundle times; - times.writeOpTime = logOperation(opCtx, oplogEntry); + times.writeOpTime = logOperation(opCtx, oplogEntry, false /*assignWallClockTime*/); times.wallClockTime = oplogEntry->getWallClockTime(); if (updateTxnTable) { SessionTxnRecord sessionTxnRecord; @@ -1487,6 +1636,10 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, // number of applyOps written will be used. It also expects that the vector of given statements is // non-empty. // +// The 'applyOpsOperationAssignment' contains BSON serialized transaction statements, their +// assigment to "applyOps" oplog entries, and oplog slots to be used for writing pre- and post- +// image oplog entries for a transaction. +// // In the case of writing entries for a prepared transaction, the last oplog entry (i.e. the // implicit prepare) will always be written using the last oplog slot given, even if this means // skipping over some reserved slots. @@ -1496,11 +1649,12 @@ int logOplogEntriesForTransaction( OperationContext* opCtx, std::vector<repl::ReplOperation>* stmts, const std::vector<OplogSlot>& oplogSlots, + const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment, boost::optional<ImageBundle>* prePostImageToWriteToImageCollection, size_t numberOfPrePostImagesToWrite, - bool prepare) { + bool prepare, + Date_t wallClockTime) { invariant(!stmts->empty()); - invariant(stmts->size() <= oplogSlots.size()); // Storage transaction commit is the last place inside a transaction that can throw an // exception. In order to safely allow exceptions to be thrown at that point, this function must @@ -1516,15 +1670,17 @@ int logOplogEntriesForTransaction( invariant(opCtx->lockState()->isWriteLocked()); prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); - auto currOplogSlot = oplogSlots.begin(); + auto currPrePostImageOplogEntryOplogSlot = + applyOpsOperationAssignment.prePostImageOplogEntryOplogSlots.begin(); + // We never want to store pre-images or post-images when we're migrating oplog entries from // another replica set. const auto& migrationRecipientInfo = repl::tenantMigrationRecipientInfo(opCtx); auto logPrePostImageNoopEntry = [&](const repl::ReplOperation& statement, const BSONObj& imageDoc) { - auto slot = *currOplogSlot; - ++currOplogSlot; + auto slot = *currPrePostImageOplogEntryOplogSlot; + ++currPrePostImageOplogEntryOplogSlot; MutableOplogEntry imageEntry; imageEntry.setSessionId(*opCtx->getLogicalSessionId()); @@ -1537,7 +1693,7 @@ int logOplogEntriesForTransaction( imageEntry.setOpTime(slot); imageEntry.setDestinedRecipient(statement.getDestinedRecipient()); - return logOperation(opCtx, &imageEntry); + logOperation(opCtx, &imageEntry); }; if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) { @@ -1551,16 +1707,14 @@ int logOplogEntriesForTransaction( // image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the // pre-image will be written to the image collection (after all the applyOps oplog // entries are written). - auto opTime = logPrePostImageNoopEntry(statement, statement.getPreImage()); - statement.setPreImageOpTime(opTime); + logPrePostImageNoopEntry(statement, statement.getPreImage()); } if (!statement.getPostImage().isEmpty() && statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) { // Likewise, when 'needsRetryImage' is equal to kPostImage, the post-image will be // written to the image collection (after all the applyOps oplog entries are // written). - auto opTime = logPrePostImageNoopEntry(statement, statement.getPostImage()); - statement.setPostImageOpTime(opTime); + logPrePostImageNoopEntry(statement, statement.getPostImage()); } } } @@ -1573,12 +1727,22 @@ int logOplogEntriesForTransaction( // statements have been packed, it should point to stmts.end(), which is the loop's // termination condition. auto stmtsIter = stmts->begin(); + auto applyOpsIter = applyOpsOperationAssignment.applyOpsEntries.begin(); while (stmtsIter != stmts->end()) { + tassert(6278509, + "Not enough \"applyOps\" entries", + applyOpsIter != applyOpsOperationAssignment.applyOpsEntries.end()); + auto& applyOpsEntry = *applyOpsIter++; BSONObjBuilder applyOpsBuilder; boost::optional<std::pair<repl::RetryImageEnum, BSONObj>> imageToWrite; - auto nextStmt = packTransactionStatementsForApplyOps( - &applyOpsBuilder, &stmtIdsWritten, &imageToWrite, stmtsIter, stmts->end()); + const auto nextStmt = stmtsIter + applyOpsEntry.operations.size(); + packTransactionStatementsForApplyOps(&applyOpsBuilder, + &stmtIdsWritten, + &imageToWrite, + stmtsIter, + nextStmt, + applyOpsEntry.operations); // If we packed the last op, then the next oplog entry we log should be the implicit // commit or implicit prepare, i.e. we omit the 'partialTxn' field. @@ -1590,15 +1754,12 @@ int logOplogEntriesForTransaction( auto isPartialTxn = !lastOp; if (imageToWrite) { - // Reserve an oplog slot for potential forged noop oplog entry for the pre-image or - // post-image. uassert(6054002, str::stream() << NamespaceString::kConfigImagesNamespace << " can only store the pre or post image of one " "findAndModify operation for each " "transaction", !(*prePostImageToWriteToImageCollection)); - ++currOplogSlot; } if (isPartialTxn || (imageToWrite && !prepare)) { @@ -1629,12 +1790,6 @@ int logOplogEntriesForTransaction( // the first and last op. auto updateTxnTable = firstOp || lastOp; - // Use the next reserved oplog slot. In the special case of writing the implicit - // 'prepare' oplog entry, we use the last reserved oplog slot, since callers of this - // function will expect that timestamp to be used as the 'prepare' timestamp. This - // may mean we skipped over some reserved slots, but there's no harm in that. - auto oplogSlot = implicitPrepare ? oplogSlots.back() : *currOplogSlot++; - // The first optime of the transaction is always the first oplog slot, except in the // case of a single prepare oplog entry. auto firstOpTimeOfTxn = @@ -1646,8 +1801,9 @@ int logOplogEntriesForTransaction( auto startOpTime = boost::make_optional(!implicitCommit, firstOpTimeOfTxn); MutableOplogEntry oplogEntry; - oplogEntry.setOpTime(oplogSlot); + oplogEntry.setOpTime(applyOpsEntry.oplogSlot); oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime); + oplogEntry.setWallClockTime(wallClockTime); oplogEntry.setObject(applyOpsBuilder.done()); auto txnState = isPartialTxn ? DurableTxnStateEnum::kInProgress @@ -1670,15 +1826,11 @@ int logOplogEntriesForTransaction( prevWriteOpTime.writeOpTime.getTimestamp()}; } - const auto applyOpsEntryTimestamp = prevWriteOpTime.writeOpTime.getTimestamp(); - writeChangeStreamPreImagesForApplyOpsEntries( - opCtx, stmtsIter, nextStmt, applyOpsEntryTimestamp, oplogEntry.getWallClockTime()); - // Advance the iterator to the beginning of the remaining unpacked statements. stmtsIter = nextStmt; } - return currOplogSlot - oplogSlots.begin(); + return applyOpsOperationAssignment.numberOfOplogSlotsUsed; } void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, @@ -1759,10 +1911,27 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, uasserted(51268, "hangAndFailUnpreparedCommitAfterReservingOplogSlot fail point enabled"); } + // Serialize transaction statements to BSON and determine their assignment to "applyOps" + // entries. + const auto applyOpsOplogSlotAndOperationAssignment = + getApplyOpsOplogSlotAndOperationAssignmentForTransaction( + opCtx, oplogSlots, numberOfPrePostImagesToWrite, false /*prepare*/, *statements); + + const auto wallClockTime = getWallClockTimeForOpLog(opCtx); + writeChangeStreamPreImagesForTransaction( + opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime); + // Log in-progress entries for the transaction along with the implicit commit. boost::optional<ImageBundle> imageToWrite; - int numOplogEntries = logOplogEntriesForTransaction( - opCtx, statements, oplogSlots, &imageToWrite, numberOfPrePostImagesToWrite, false); + int numOplogEntries = logOplogEntriesForTransaction(opCtx, + statements, + oplogSlots, + applyOpsOplogSlotAndOperationAssignment, + &imageToWrite, + numberOfPrePostImagesToWrite, + false /* prepare*/, + wallClockTime); + if (imageToWrite) { writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), @@ -1799,14 +1968,35 @@ void OpObserverImpl::onPreparedTransactionCommit( logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted); } -void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) { +std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment> +OpObserverImpl::preTransactionPrepare(OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) { + auto applyOpsOplogSlotAndOperationAssignment = + getApplyOpsOplogSlotAndOperationAssignmentForTransaction( + opCtx, reservedSlots, numberOfPrePostImagesToWrite, true /*prepare*/, *statements); + writeChangeStreamPreImagesForTransaction( + opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime); + return std::make_unique<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>( + std::move(applyOpsOplogSlotAndOperationAssignment)); +} + +void OpObserverImpl::onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) { invariant(!reservedSlots.empty()); const auto prepareOpTime = reservedSlots.back(); invariant(opCtx->getTxnNumber()); invariant(!prepareOpTime.isNull()); + tassert(6278510, + "Operation assignments to applyOps entries should be present", + applyOpsOperationAssignment); // Don't write oplog entry on secondaries. if (!opCtx->writesAreReplicated()) { @@ -1837,9 +2027,11 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, logOplogEntriesForTransaction(opCtx, statements, reservedSlots, + *applyOpsOperationAssignment, &imageToWrite, numberOfPrePostImagesToWrite, - true /* prepare */); + true /* prepare */, + wallClockTime); if (imageToWrite) { writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), @@ -1861,6 +2053,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, oplogEntry.setOpTime(oplogSlot); oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); oplogEntry.setObject(applyOpsBuilder.done()); + oplogEntry.setWallClockTime(wallClockTime); logApplyOpsForTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kPrepared, |