summaryrefslogtreecommitdiff
path: root/src/mongo/db/op_observer_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/op_observer_impl.cpp')
-rw-r--r--src/mongo/db/op_observer_impl.cpp315
1 files changed, 254 insertions, 61 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index dda43be72ee..fac3cd11e1e 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/op_observer_impl.h"
+#include <algorithm>
#include <limits>
#include "mongo/bson/bsonobjbuilder.h"
@@ -102,8 +103,12 @@ Date_t getWallClockTimeForOpLog(OperationContext* opCtx) {
return clockSource->now();
}
-repl::OpTime logOperation(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {
- oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx));
+repl::OpTime logOperation(OperationContext* opCtx,
+ MutableOplogEntry* oplogEntry,
+ bool assignWallClockTime = true) {
+ if (assignWallClockTime) {
+ oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx));
+ }
auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
auto opTime = repl::logOp(opCtx, oplogEntry);
times.push_back(opTime);
@@ -1320,8 +1325,8 @@ namespace {
*/
void writeChangeStreamPreImagesForApplyOpsEntries(
OperationContext* opCtx,
- const std::vector<repl::ReplOperation>::iterator& stmtBegin,
- const std::vector<repl::ReplOperation>::iterator& stmtEnd,
+ std::vector<repl::ReplOperation>::const_iterator stmtBegin,
+ std::vector<repl::ReplOperation>::const_iterator stmtEnd,
Timestamp applyOpsTimestamp,
Date_t operationTime) {
int64_t applyOpsIndex{0};
@@ -1342,20 +1347,175 @@ void writeChangeStreamPreImagesForApplyOpsEntries(
}
}
+/**
+ * Returns transaction operations that can fit into an "applyOps" entry. The returned operations are
+ * serialized to BSON. The transaction operations are given by range ['operationsBegin',
+ * 'operationsEnd'). Constraints for fitting the operations: (1) the resulting "applyOps" entry
+ * shouldn't exceed the 16MB limit, unless only one operation is allocated to it; (2) the number of
+ * operations is not larger than the maximum number of transaction statements allowed in one entry
+ * as defined by 'gMaxNumberOfTransactionOperationsInSingleOplogEntry'.
+ */
+std::vector<BSONObj> packTransactionOperationsIntoApplyOps(
+ std::vector<repl::ReplOperation>::const_iterator operationsBegin,
+ std::vector<repl::ReplOperation>::const_iterator operationsEnd) {
+ tassert(6278503,
+ "gMaxNumberOfTransactionOperationsInSingleOplogEntry should be positive number",
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry > 0);
+ std::vector<BSONObj> operations;
+ size_t totalOperationsSize{0};
+ for (auto operationIter = operationsBegin; operationIter != operationsEnd; ++operationIter) {
+ const auto& operation = *operationIter;
+
+ // Stop packing when either number of transaction operations is reached, or when the next
+ // one would make the total size of operations larger than the maximum BSON Object User
+ // Size. We rely on the headroom between BSONObjMaxUserSize and BSONObjMaxInternalSize to
+ // cover the BSON overhead and the other "applyOps" entry fields. But if a single operation
+ // in the set exceeds BSONObjMaxUserSize, we still fit it, as a single max-length operation
+ // should be able to be packed into an "applyOps" entry.
+ if (operations.size() ==
+ static_cast<size_t>(gMaxNumberOfTransactionOperationsInSingleOplogEntry) ||
+ (operations.size() > 0 &&
+ (totalOperationsSize + DurableOplogEntry::getDurableReplOperationSize(operation) >
+ BSONObjMaxUserSize))) {
+ break;
+ }
+ auto serializedOperation = operation.toBSON();
+ totalOperationsSize += static_cast<size_t>(serializedOperation.objsize());
+ operations.emplace_back(std::move(serializedOperation));
+ }
+ return operations;
+}
+
+/**
+ * Returns oplog slots to be used for "applyOps" oplog entries, BSON serialized operations, their
+ * assignments to "applyOps" entries, and oplog slots to be used for writing pre- and post- image
+ * oplog entries for the transaction consisting of 'operations'. Allocates oplog slots from
+ * 'oplogSlots'. The 'numberOfPrePostImagesToWrite' is the number of CRUD operations that have a
+ * pre-image to write as a noop oplog entry. The 'prepare' indicates if the function is called when
+ * preparing a transaction.
+ */
+OpObserver::ApplyOpsOplogSlotAndOperationAssignment
+getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& oplogSlots,
+ size_t numberOfPrePostImagesToWrite,
+ bool prepare,
+ std::vector<repl::ReplOperation>& operations) {
+ if (operations.empty()) {
+ return {{}, {}, 0 /*numberOfOplogSlotsUsed*/};
+ }
+ tassert(6278504, "Insufficient number of oplogSlots", operations.size() <= oplogSlots.size());
+
+ std::vector<OplogSlot> prePostImageOplogEntryOplogSlots;
+ std::vector<OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry> applyOpsEntries;
+ const auto operationCount = operations.size();
+ auto oplogSlotIter = oplogSlots.begin();
+ auto getNextOplogSlot = [&]() {
+ tassert(6278505, "Unexpected end of oplog slot vector", oplogSlotIter != oplogSlots.end());
+ return *oplogSlotIter++;
+ };
+
+ auto isMigratingTenant = [&opCtx]() {
+ return static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx));
+ };
+
+ // We never want to store pre-images or post-images when we're migrating oplog entries from
+ // another replica set.
+ if (numberOfPrePostImagesToWrite > 0 && !isMigratingTenant()) {
+ for (size_t operationIdx = 0; operationIdx < operationCount; ++operationIdx) {
+ auto& statement = operations[operationIdx];
+ if (statement.isChangeStreamPreImageRecordedInOplog() ||
+ (statement.isPreImageRecordedForRetryableInternalTransaction() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) {
+ tassert(6278506, "Expected a pre-image", !statement.getPreImage().isEmpty());
+ auto oplogSlot = getNextOplogSlot();
+ prePostImageOplogEntryOplogSlots.push_back(oplogSlot);
+ statement.setPreImageOpTime(oplogSlot);
+ }
+ if (!statement.getPostImage().isEmpty() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) {
+ auto oplogSlot = getNextOplogSlot();
+ prePostImageOplogEntryOplogSlots.push_back(oplogSlot);
+ statement.setPostImageOpTime(oplogSlot);
+ }
+ }
+ }
+
+ auto hasNeedsRetryImage = [](const repl::ReplOperation& operation) {
+ return static_cast<bool>(operation.getNeedsRetryImage());
+ };
+
+ // Assign operations to "applyOps" entries.
+ for (auto operationIt = operations.begin(); operationIt != operations.end();) {
+ auto applyOpsOperations =
+ packTransactionOperationsIntoApplyOps(operationIt, operations.end());
+ const auto opCountWithNeedsRetryImage =
+ std::count_if(operationIt, operationIt + applyOpsOperations.size(), hasNeedsRetryImage);
+ if (opCountWithNeedsRetryImage > 0) {
+ // Reserve a slot for a forged no-op entry.
+ getNextOplogSlot();
+ }
+ operationIt += applyOpsOperations.size();
+ applyOpsEntries.emplace_back(
+ OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry{
+ getNextOplogSlot(), std::move(applyOpsOperations)});
+ }
+
+ // In the special case of writing the implicit 'prepare' oplog entry, we use the last reserved
+ // oplog slot. This may mean we skipped over some reserved slots, but there's no harm in that.
+ if (prepare) {
+ applyOpsEntries.back().oplogSlot = oplogSlots.back();
+ }
+ return {std::move(prePostImageOplogEntryOplogSlots),
+ std::move(applyOpsEntries),
+ static_cast<size_t>(oplogSlotIter - oplogSlots.begin())};
+}
+
+/**
+ * Writes change stream pre-images for transaction 'operations'. The 'applyOpsOperationAssignment'
+ * contains a representation of "applyOps" entries to be written for the transaction. The
+ * 'operationTime' is wall clock time of the operations used for the pre-image documents.
+ */
+void writeChangeStreamPreImagesForTransaction(
+ OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& operations,
+ const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment,
+ Date_t operationTime) {
+ // This function must be called from an outer WriteUnitOfWork in order to be rolled back upon
+ // reaching the exception.
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ auto applyOpsEntriesIt = applyOpsOperationAssignment.applyOpsEntries.begin();
+ for (auto operationIter = operations.begin(); operationIter != operations.end();) {
+ tassert(6278507,
+ "Unexpected end of applyOps entries vector",
+ applyOpsEntriesIt != applyOpsOperationAssignment.applyOpsEntries.end());
+ const auto& applyOpsEntry = *applyOpsEntriesIt++;
+ const auto operationSequenceEnd = operationIter + applyOpsEntry.operations.size();
+ writeChangeStreamPreImagesForApplyOpsEntries(opCtx,
+ operationIter,
+ operationSequenceEnd,
+ applyOpsEntry.oplogSlot.getTimestamp(),
+ operationTime);
+ operationIter = operationSequenceEnd;
+ }
+}
+
// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array
-// field. Appends as many operations as possible to the array (and their corresponding statement
-// ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the
-// maximum number of transaction statements allowed in one entry. If any of the statements has
-// a pre-image or post-image that needs to be stored in the image collection, stores it to
-// 'imageToWrite'.
-//
-// Returns an iterator to the first statement that wasn't packed into the applyOps object.
-std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
+// field (and their corresponding statement ids to 'stmtIdsWritten'). The transaction statements are
+// represented as range ['stmtBegin', 'stmtEnd') and BSON serialized objects 'operations'. If any of
+// the statements has a pre-image or post-image that needs to be stored in the image collection,
+// stores it to 'imageToWrite'.
+void packTransactionStatementsForApplyOps(
BSONObjBuilder* applyOpsBuilder,
std::vector<StmtId>* stmtIdsWritten,
boost::optional<std::pair<repl::RetryImageEnum, BSONObj>>* imageToWrite,
std::vector<repl::ReplOperation>::iterator stmtBegin,
- std::vector<repl::ReplOperation>::iterator stmtEnd) {
+ std::vector<repl::ReplOperation>::iterator stmtEnd,
+ const std::vector<BSONObj>& operations) {
+ tassert(6278508,
+ "Number of operations does not match the number of transaction statements",
+ operations.size() == static_cast<size_t>(stmtEnd - stmtBegin));
auto setImageToWrite = [&](const repl::ReplOperation& stmt) {
uassert(6054001,
str::stream() << NamespaceString::kConfigImagesNamespace
@@ -1381,21 +1541,11 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
};
std::vector<repl::ReplOperation>::iterator stmtIter;
+ auto operationsIter = operations.begin();
BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd));
for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) {
const auto& stmt = *stmtIter;
- // Stop packing when either number of transaction operations is reached, or when the next
- // one would put the array over the maximum BSON Object User Size. We rely on the head room
- // between BSONObjMaxUserSize and BSONObjMaxInternalSize to cover the BSON overhead and the
- // other applyOps fields. But if the array with a single operation exceeds
- // BSONObjMaxUserSize, we still log it, as a single max-length operation should be able to
- // be applied.
- if (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry ||
- (opsArray.arrSize() > 0 &&
- (opsArray.len() + DurableOplogEntry::getDurableReplOperationSize(stmt) >
- BSONObjMaxUserSize)))
- break;
- opsArray.append(stmt.toBSON());
+ opsArray.append(*operationsIter++);
const auto stmtIds = stmt.getStatementIds();
stmtIdsWritten->insert(stmtIdsWritten->end(), stmtIds.begin(), stmtIds.end());
if (stmt.getNeedsRetryImage()) {
@@ -1413,7 +1563,6 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
e.code() != ErrorCodes::BSONObjectTooLarge);
throw;
}
- return stmtIter;
}
// Logs one applyOps entry and may update the transactions table. Assumes that the given BSON
@@ -1449,7 +1598,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
try {
OpTimeBundle times;
- times.writeOpTime = logOperation(opCtx, oplogEntry);
+ times.writeOpTime = logOperation(opCtx, oplogEntry, false /*assignWallClockTime*/);
times.wallClockTime = oplogEntry->getWallClockTime();
if (updateTxnTable) {
SessionTxnRecord sessionTxnRecord;
@@ -1487,6 +1636,10 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
// number of applyOps written will be used. It also expects that the vector of given statements is
// non-empty.
//
+// The 'applyOpsOperationAssignment' contains BSON serialized transaction statements, their
+// assigment to "applyOps" oplog entries, and oplog slots to be used for writing pre- and post-
+// image oplog entries for a transaction.
+//
// In the case of writing entries for a prepared transaction, the last oplog entry (i.e. the
// implicit prepare) will always be written using the last oplog slot given, even if this means
// skipping over some reserved slots.
@@ -1496,11 +1649,12 @@ int logOplogEntriesForTransaction(
OperationContext* opCtx,
std::vector<repl::ReplOperation>* stmts,
const std::vector<OplogSlot>& oplogSlots,
+ const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment,
boost::optional<ImageBundle>* prePostImageToWriteToImageCollection,
size_t numberOfPrePostImagesToWrite,
- bool prepare) {
+ bool prepare,
+ Date_t wallClockTime) {
invariant(!stmts->empty());
- invariant(stmts->size() <= oplogSlots.size());
// Storage transaction commit is the last place inside a transaction that can throw an
// exception. In order to safely allow exceptions to be thrown at that point, this function must
@@ -1516,15 +1670,17 @@ int logOplogEntriesForTransaction(
invariant(opCtx->lockState()->isWriteLocked());
prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime();
- auto currOplogSlot = oplogSlots.begin();
+ auto currPrePostImageOplogEntryOplogSlot =
+ applyOpsOperationAssignment.prePostImageOplogEntryOplogSlots.begin();
+
// We never want to store pre-images or post-images when we're migrating oplog entries from
// another replica set.
const auto& migrationRecipientInfo = repl::tenantMigrationRecipientInfo(opCtx);
auto logPrePostImageNoopEntry = [&](const repl::ReplOperation& statement,
const BSONObj& imageDoc) {
- auto slot = *currOplogSlot;
- ++currOplogSlot;
+ auto slot = *currPrePostImageOplogEntryOplogSlot;
+ ++currPrePostImageOplogEntryOplogSlot;
MutableOplogEntry imageEntry;
imageEntry.setSessionId(*opCtx->getLogicalSessionId());
@@ -1537,7 +1693,7 @@ int logOplogEntriesForTransaction(
imageEntry.setOpTime(slot);
imageEntry.setDestinedRecipient(statement.getDestinedRecipient());
- return logOperation(opCtx, &imageEntry);
+ logOperation(opCtx, &imageEntry);
};
if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) {
@@ -1551,16 +1707,14 @@ int logOplogEntriesForTransaction(
// image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the
// pre-image will be written to the image collection (after all the applyOps oplog
// entries are written).
- auto opTime = logPrePostImageNoopEntry(statement, statement.getPreImage());
- statement.setPreImageOpTime(opTime);
+ logPrePostImageNoopEntry(statement, statement.getPreImage());
}
if (!statement.getPostImage().isEmpty() &&
statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) {
// Likewise, when 'needsRetryImage' is equal to kPostImage, the post-image will be
// written to the image collection (after all the applyOps oplog entries are
// written).
- auto opTime = logPrePostImageNoopEntry(statement, statement.getPostImage());
- statement.setPostImageOpTime(opTime);
+ logPrePostImageNoopEntry(statement, statement.getPostImage());
}
}
}
@@ -1573,12 +1727,22 @@ int logOplogEntriesForTransaction(
// statements have been packed, it should point to stmts.end(), which is the loop's
// termination condition.
auto stmtsIter = stmts->begin();
+ auto applyOpsIter = applyOpsOperationAssignment.applyOpsEntries.begin();
while (stmtsIter != stmts->end()) {
+ tassert(6278509,
+ "Not enough \"applyOps\" entries",
+ applyOpsIter != applyOpsOperationAssignment.applyOpsEntries.end());
+ auto& applyOpsEntry = *applyOpsIter++;
BSONObjBuilder applyOpsBuilder;
boost::optional<std::pair<repl::RetryImageEnum, BSONObj>> imageToWrite;
- auto nextStmt = packTransactionStatementsForApplyOps(
- &applyOpsBuilder, &stmtIdsWritten, &imageToWrite, stmtsIter, stmts->end());
+ const auto nextStmt = stmtsIter + applyOpsEntry.operations.size();
+ packTransactionStatementsForApplyOps(&applyOpsBuilder,
+ &stmtIdsWritten,
+ &imageToWrite,
+ stmtsIter,
+ nextStmt,
+ applyOpsEntry.operations);
// If we packed the last op, then the next oplog entry we log should be the implicit
// commit or implicit prepare, i.e. we omit the 'partialTxn' field.
@@ -1590,15 +1754,12 @@ int logOplogEntriesForTransaction(
auto isPartialTxn = !lastOp;
if (imageToWrite) {
- // Reserve an oplog slot for potential forged noop oplog entry for the pre-image or
- // post-image.
uassert(6054002,
str::stream() << NamespaceString::kConfigImagesNamespace
<< " can only store the pre or post image of one "
"findAndModify operation for each "
"transaction",
!(*prePostImageToWriteToImageCollection));
- ++currOplogSlot;
}
if (isPartialTxn || (imageToWrite && !prepare)) {
@@ -1629,12 +1790,6 @@ int logOplogEntriesForTransaction(
// the first and last op.
auto updateTxnTable = firstOp || lastOp;
- // Use the next reserved oplog slot. In the special case of writing the implicit
- // 'prepare' oplog entry, we use the last reserved oplog slot, since callers of this
- // function will expect that timestamp to be used as the 'prepare' timestamp. This
- // may mean we skipped over some reserved slots, but there's no harm in that.
- auto oplogSlot = implicitPrepare ? oplogSlots.back() : *currOplogSlot++;
-
// The first optime of the transaction is always the first oplog slot, except in the
// case of a single prepare oplog entry.
auto firstOpTimeOfTxn =
@@ -1646,8 +1801,9 @@ int logOplogEntriesForTransaction(
auto startOpTime = boost::make_optional(!implicitCommit, firstOpTimeOfTxn);
MutableOplogEntry oplogEntry;
- oplogEntry.setOpTime(oplogSlot);
+ oplogEntry.setOpTime(applyOpsEntry.oplogSlot);
oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime);
+ oplogEntry.setWallClockTime(wallClockTime);
oplogEntry.setObject(applyOpsBuilder.done());
auto txnState = isPartialTxn
? DurableTxnStateEnum::kInProgress
@@ -1670,15 +1826,11 @@ int logOplogEntriesForTransaction(
prevWriteOpTime.writeOpTime.getTimestamp()};
}
- const auto applyOpsEntryTimestamp = prevWriteOpTime.writeOpTime.getTimestamp();
- writeChangeStreamPreImagesForApplyOpsEntries(
- opCtx, stmtsIter, nextStmt, applyOpsEntryTimestamp, oplogEntry.getWallClockTime());
-
// Advance the iterator to the beginning of the remaining unpacked statements.
stmtsIter = nextStmt;
}
- return currOplogSlot - oplogSlots.begin();
+ return applyOpsOperationAssignment.numberOfOplogSlotsUsed;
}
void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
@@ -1759,10 +1911,27 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
uasserted(51268, "hangAndFailUnpreparedCommitAfterReservingOplogSlot fail point enabled");
}
+ // Serialize transaction statements to BSON and determine their assignment to "applyOps"
+ // entries.
+ const auto applyOpsOplogSlotAndOperationAssignment =
+ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ opCtx, oplogSlots, numberOfPrePostImagesToWrite, false /*prepare*/, *statements);
+
+ const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
+ writeChangeStreamPreImagesForTransaction(
+ opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime);
+
// Log in-progress entries for the transaction along with the implicit commit.
boost::optional<ImageBundle> imageToWrite;
- int numOplogEntries = logOplogEntriesForTransaction(
- opCtx, statements, oplogSlots, &imageToWrite, numberOfPrePostImagesToWrite, false);
+ int numOplogEntries = logOplogEntriesForTransaction(opCtx,
+ statements,
+ oplogSlots,
+ applyOpsOplogSlotAndOperationAssignment,
+ &imageToWrite,
+ numberOfPrePostImagesToWrite,
+ false /* prepare*/,
+ wallClockTime);
+
if (imageToWrite) {
writeToImageCollection(opCtx,
*opCtx->getLogicalSessionId(),
@@ -1799,14 +1968,35 @@ void OpObserverImpl::onPreparedTransactionCommit(
logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted);
}
-void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) {
+std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>
+OpObserverImpl::preTransactionPrepare(OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) {
+ auto applyOpsOplogSlotAndOperationAssignment =
+ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ opCtx, reservedSlots, numberOfPrePostImagesToWrite, true /*prepare*/, *statements);
+ writeChangeStreamPreImagesForTransaction(
+ opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime);
+ return std::make_unique<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>(
+ std::move(applyOpsOplogSlotAndOperationAssignment));
+}
+
+void OpObserverImpl::onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) {
invariant(!reservedSlots.empty());
const auto prepareOpTime = reservedSlots.back();
invariant(opCtx->getTxnNumber());
invariant(!prepareOpTime.isNull());
+ tassert(6278510,
+ "Operation assignments to applyOps entries should be present",
+ applyOpsOperationAssignment);
// Don't write oplog entry on secondaries.
if (!opCtx->writesAreReplicated()) {
@@ -1837,9 +2027,11 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
logOplogEntriesForTransaction(opCtx,
statements,
reservedSlots,
+ *applyOpsOperationAssignment,
&imageToWrite,
numberOfPrePostImagesToWrite,
- true /* prepare */);
+ true /* prepare */,
+ wallClockTime);
if (imageToWrite) {
writeToImageCollection(opCtx,
*opCtx->getLogicalSessionId(),
@@ -1861,6 +2053,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
oplogEntry.setOpTime(oplogSlot);
oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
oplogEntry.setObject(applyOpsBuilder.done());
+ oplogEntry.setWallClockTime(wallClockTime);
logApplyOpsForTransaction(opCtx,
&oplogEntry,
DurableTxnStateEnum::kPrepared,