summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2021-10-26 04:04:34 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2021-11-20 04:49:23 +0000
commite3b5c6069197d478929449eec87f8bfcc58bc7bd (patch)
tree214e9c5d8f6ea7f431f0eb537f427e0e4040391c /src/mongo
parent95ba764c2c7a787e117536fe5632fe484f9178c8 (diff)
downloadmongo-e3b5c6069197d478929449eec87f8bfcc58bc7bd.tar.gz
SERVER-60540 Add retryability support for internal transactions for findAndModify
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/catalog/collection_impl.cpp130
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp2
-rw-r--r--src/mongo/db/op_observer_impl.cpp198
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp946
-rw-r--r--src/mongo/db/ops/write_ops.idl4
-rw-r--r--src/mongo/db/repl/oplog.cpp40
-rw-r--r--src/mongo/db/repl/oplog_entry.h34
-rw-r--r--src/mongo/db/repl/transaction_oplog_application.cpp6
-rw-r--r--src/mongo/db/transaction_participant.cpp4
-rw-r--r--src/mongo/db/transaction_participant.h4
11 files changed, 958 insertions, 411 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index fba19caaced..1fe478e17d5 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2473,6 +2473,7 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/catalog/catalog_test_fixture',
'$BUILD_DIR/mongo/db/catalog/import_collection_oplog_entry',
'$BUILD_DIR/mongo/db/catalog/index_build_entry_idl',
+ '$BUILD_DIR/mongo/db/catalog/local_oplog_info',
'$BUILD_DIR/mongo/db/mongohasher',
'$BUILD_DIR/mongo/db/query/common_query_enums_and_helpers',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp
index 7ba1084c750..8d6afd948ab 100644
--- a/src/mongo/db/catalog/collection_impl.cpp
+++ b/src/mongo/db/catalog/collection_impl.cpp
@@ -260,32 +260,44 @@ Status validateChangeStreamPreAndPostImagesOptionIsPermitted(const NamespaceStri
return Status::OK();
}
+/**
+ * Returns true if we are running retryable write or retryable internal multi-document transaction.
+ */
bool isRetryableWrite(OperationContext* opCtx) {
+ if (!opCtx->writesAreReplicated() || !opCtx->isRetryableWrite()) {
+ return false;
+ }
auto txnParticipant = TransactionParticipant::get(opCtx);
- const bool inMultiDocumentTransaction = txnParticipant && txnParticipant.transactionIsOpen();
- return !inMultiDocumentTransaction && opCtx->writesAreReplicated() && opCtx->getTxnNumber();
+ return txnParticipant &&
+ (!opCtx->inMultiDocumentTransaction() || txnParticipant.transactionIsOpen());
+}
+
+bool shouldStoreImageInSideCollection(OperationContext* opCtx) {
+ // Check if we're in a retryable write that should save the image to `config.image_collection`.
+ // This is the only time `storeFindAndModifyImagesInSideCollection` may be queried for this
+ // transaction.
+ return isRetryableWrite(opCtx) &&
+ repl::feature_flags::gFeatureFlagRetryableFindAndModify.isEnabledAndIgnoreFCV() &&
+ repl::gStoreFindAndModifyImagesInSideCollection.load();
}
std::vector<OplogSlot> reserveOplogSlotsForRetryableFindAndModify(OperationContext* opCtx,
const int numSlots) {
- if (isRetryableWrite(opCtx)) {
- // Check if we're in a retryable write that should save the image to
- // `config.image_collection`. This is the only time
- // `storeFindAndModifyImagesInSideCollection` may be queried for this transaction.
- const bool storeImageInSideCollection =
- repl::feature_flags::gFeatureFlagRetryableFindAndModify.isEnabledAndIgnoreFCV() &&
- repl::gStoreFindAndModifyImagesInSideCollection.load();
- if (storeImageInSideCollection) {
- // We reserve oplog slots here, expecting the slot with the greatest timestmap (say TS)
- // to be used as the oplog timestamp. Tenant migrations and resharding will forge no-op
- // image oplog entries and set the timestamp for these synthetic entries to be TS - 1.
- auto oplogInfo = LocalOplogInfo::get(opCtx);
- auto slots = oplogInfo->getNextOpTimes(opCtx, numSlots);
- uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(slots.back().getTimestamp()));
- return slots;
- }
+ invariant(isRetryableWrite(opCtx));
+
+ // For retryable findAndModify running in a multi-document transaction, we will reserve the
+ // oplog entries when the transaction prepares or commits without prepare.
+ if (opCtx->inMultiDocumentTransaction()) {
+ return {};
}
- return {};
+
+ // We reserve oplog slots here, expecting the slot with the greatest timestmap (say TS) to be
+ // used as the oplog timestamp. Tenant migrations and resharding will forge no-op image oplog
+ // entries and set the timestamp for these synthetic entries to be TS - 1.
+ auto oplogInfo = LocalOplogInfo::get(opCtx);
+ auto slots = oplogInfo->getNextOpTimes(opCtx, numSlots);
+ uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(slots.back().getTimestamp()));
+ return slots;
}
@@ -1168,14 +1180,16 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx,
}
std::vector<OplogSlot> oplogSlots;
- if (storeDeletedDoc == Collection::StoreDeletedDoc::On && !getRecordPreImages()) {
- oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, 2);
- }
auto retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kNone;
- if (storeDeletedDoc == Collection::StoreDeletedDoc::On && isRetryableWrite(opCtx)) {
+ if (storeDeletedDoc == Collection::StoreDeletedDoc::On && !getRecordPreImages() &&
+ isRetryableWrite(opCtx)) {
+ const bool storeImageInSideCollection = shouldStoreImageInSideCollection(opCtx);
retryableFindAndModifyLocation =
- (oplogSlots.empty() ? RetryableFindAndModifyLocation::kOplog
- : RetryableFindAndModifyLocation::kSideCollection);
+ (storeImageInSideCollection ? RetryableFindAndModifyLocation::kSideCollection
+ : RetryableFindAndModifyLocation::kOplog);
+ if (storeImageInSideCollection) {
+ oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, 2);
+ }
}
OplogDeleteEntryArgs deleteArgs{nullptr /* deletedDoc */,
fromMigrate,
@@ -1284,23 +1298,24 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx,
const bool setNeedsRetryImageOplogField =
args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None;
if (args->oplogSlots.empty() && setNeedsRetryImageOplogField) {
- // If the update is part of a retryable write and we expect to be storing the pre- or post-
- // image in a side collection, then we must reserve oplog slots in advance. We expect to
- // use the reserved oplog slots as follows, where TS is the greatest timestamp of
- // 'oplogSlots':
- // TS - 2: If 'getRecordPreImages()' is true, we reserve an extra oplog slot in case we must
- // account for storing a pre-image in the oplog and an eventual synthetic no-op
- // image oplog used by tenant migrations/resharding.
- // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set
- // the entry timestamps to TS - 1.
- // TS: The timestamp given to the update oplog entry.
- const auto numSlotsToReserve = getRecordPreImages() ? 3 : 2;
- const auto oplogSlots =
- reserveOplogSlotsForRetryableFindAndModify(opCtx, numSlotsToReserve);
- args->oplogSlots = oplogSlots;
+ const bool storeImageInSideCollection = shouldStoreImageInSideCollection(opCtx);
onUpdateArgs.retryableFindAndModifyLocation =
- (oplogSlots.empty() ? RetryableFindAndModifyLocation::kOplog
- : RetryableFindAndModifyLocation::kSideCollection);
+ (storeImageInSideCollection ? RetryableFindAndModifyLocation::kSideCollection
+ : RetryableFindAndModifyLocation::kOplog);
+ if (storeImageInSideCollection) {
+ // If the update is part of a retryable write and we expect to be storing the pre- or
+ // post-image in a side collection, then we must reserve oplog slots in advance. We
+ // expect to use the reserved oplog slots as follows, where TS is the greatest
+ // timestamp of 'oplogSlots':
+ // TS - 2: If 'getRecordPreImages()' is true, we reserve an extra oplog slot in case we
+ // must account for storing a pre-image in the oplog and an eventual synthetic
+ // no-op image oplog used by tenant migrations/resharding.
+ // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set
+ // the entry timestamps to TS - 1.
+ // TS: The timestamp given to the update oplog entry.
+ const auto numSlotsToReserve = getRecordPreImages() ? 3 : 2;
+ args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, numSlotsToReserve);
+ }
} else {
// Retryable findAndModify commands should not reserve oplog slots before entering this
// function since tenant migrations and resharding rely on always being able to set
@@ -1365,23 +1380,24 @@ StatusWith<RecordData> CollectionImpl::updateDocumentWithDamages(
const bool setNeedsRetryImageOplogField =
args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None;
if (args->oplogSlots.empty() && setNeedsRetryImageOplogField) {
- // If the update is part of a retryable write and we expect to be storing the pre- or post-
- // image in a side collection, then we must reserve oplog slots in advance. We expect to
- // use the reserved oplog slots as follows, where TS is the greatest timestamp of
- // 'oplogSlots':
- // TS - 2: If 'getRecordPreImages()' is true, we reserve an extra oplog slot in case we must
- // account for storing a pre-image in the oplog and an eventual synthetic no-op
- // image oplog used by tenant migrations/resharding.
- // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set
- // the entry timestamps to TS - 1.
- // TS: The timestamp given to the update oplog entry.
- const auto numSlotsToReserve = getRecordPreImages() ? 3 : 2;
- const auto oplogSlots =
- reserveOplogSlotsForRetryableFindAndModify(opCtx, numSlotsToReserve);
- args->oplogSlots = oplogSlots;
+ const bool storeImageInSideCollection = shouldStoreImageInSideCollection(opCtx);
onUpdateArgs.retryableFindAndModifyLocation =
- (oplogSlots.empty() ? RetryableFindAndModifyLocation::kOplog
- : RetryableFindAndModifyLocation::kSideCollection);
+ (storeImageInSideCollection ? RetryableFindAndModifyLocation::kSideCollection
+ : RetryableFindAndModifyLocation::kOplog);
+ if (storeImageInSideCollection) {
+ // If the update is part of a retryable write and we expect to be storing the pre- or
+ // post-image in a side collection, then we must reserve oplog slots in advance. We
+ // expect to use the reserved oplog slots as follows, where TS is the greatest
+ // timestamp of 'oplogSlots':
+ // TS - 2: If 'getRecordPreImages()' is true, we reserve an extra oplog slot in case we
+ // must account for storing a pre-image in the oplog and an eventual synthetic
+ // no-op image oplog used by tenant migrations/resharding.
+ // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set
+ // the entry timestamps to TS - 1.
+ // TS: The timestamp given to the update oplog entry.
+ const auto numSlotsToReserve = getRecordPreImages() ? 3 : 2;
+ args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, numSlotsToReserve);
+ }
} else {
// Retryable findAndModify commands should not reserve oplog slots before entering this
// function since tenant migrations and resharding rely on always being able to set
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index 2b835de62ee..8698fcf9f75 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -639,7 +639,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun(
!(inTransaction && replCoord->isOplogDisabledFor(opCtx, nsString)));
- const auto stmtId = 0;
+ const auto stmtId = req.getStmtId().value_or(0);
if (opCtx->isRetryableWrite()) {
const auto txnParticipant = TransactionParticipant::get(opCtx);
if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) {
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index bb7b7508f0d..e92a2582bc6 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -169,6 +169,12 @@ struct OpTimeBundle {
Date_t wallClockTime;
};
+struct ImageBundle {
+ repl::RetryImageEnum imageKind;
+ BSONObj imageDoc;
+ Timestamp timestamp;
+};
+
/**
* Write oplog entry(ies) for the update operation.
*/
@@ -682,14 +688,29 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
args.nss, args.uuid, args.updateArgs->update, args.updateArgs->criteria);
if (inRetryableInternalTransaction) {
operation.setInitializedStatementIds(args.updateArgs->stmtIds);
- }
- operation.setDestinedRecipient(
- shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
-
- if (args.updateArgs->preImageRecordingEnabledForCollection) {
+ if (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) {
+ invariant(args.updateArgs->preImageDoc);
+ operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
+ if (args.retryableFindAndModifyLocation ==
+ RetryableFindAndModifyLocation::kSideCollection) {
+ operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
+ }
+ }
+ if (args.updateArgs->storeDocOption ==
+ CollectionUpdateArgs::StoreDocOption::PostImage) {
+ invariant(!args.updateArgs->updatedDoc.isEmpty());
+ operation.setPostImage(args.updateArgs->updatedDoc.getOwned());
+ if (args.retryableFindAndModifyLocation ==
+ RetryableFindAndModifyLocation::kSideCollection) {
+ operation.setNeedsRetryImage(repl::RetryImageEnum::kPostImage);
+ }
+ }
+ } else if (args.updateArgs->preImageRecordingEnabledForCollection) {
invariant(args.updateArgs->preImageDoc);
operation.setPreImage(args.updateArgs->preImageDoc->getOwned());
}
+ operation.setDestinedRecipient(
+ shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc));
txnParticipant.addTransactionOperation(opCtx, operation);
} else {
@@ -818,19 +839,29 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
OpTimeBundle opTime;
if (inMultiDocumentTransaction) {
- tassert(5868700,
- "Attempted a retryable write within a multi-document transaction",
- args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kNone);
-
const bool inRetryableInternalTransaction =
isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId());
+ tassert(5868700,
+ "Attempted a retryable write within a non-retryable multi-document transaction",
+ inRetryableInternalTransaction ||
+ args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kNone);
+
auto operation =
MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId());
if (inRetryableInternalTransaction) {
operation.setInitializedStatementIds({stmtId});
+ if (args.retryableFindAndModifyLocation != RetryableFindAndModifyLocation::kNone) {
+ tassert(6054000,
+ "Deleted document must be present for pre-image recording",
+ args.deletedDoc);
+ operation.setPreImage(args.deletedDoc->getOwned());
+ if (args.retryableFindAndModifyLocation ==
+ RetryableFindAndModifyLocation::kSideCollection) {
+ operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage);
+ }
+ }
}
-
if (args.preImageRecordingEnabledForCollection) {
tassert(5868701,
"Deleted document must be present for pre-image recording",
@@ -1253,14 +1284,40 @@ namespace {
// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array
// field. Appends as many operations as possible to the array (and their corresponding statement
// ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the
-// maximum number of transaction statements allowed in one entry.
+// maximum number of transaction statements allowed in one entry. If any of the statements has
+// a pre-image or post-image that needs to be stored in the image collection, stores it to
+// 'imageToWrite'.
//
// Returns an iterator to the first statement that wasn't packed into the applyOps object.
std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
BSONObjBuilder* applyOpsBuilder,
std::vector<StmtId>* stmtIdsWritten,
+ boost::optional<std::pair<repl::RetryImageEnum, BSONObj>>* imageToWrite,
std::vector<repl::ReplOperation>::iterator stmtBegin,
std::vector<repl::ReplOperation>::iterator stmtEnd) {
+ auto setImageToWrite = [&](const repl::ReplOperation& stmt) {
+ uassert(6054001,
+ str::stream() << NamespaceString::kConfigImagesNamespace
+ << " can only store the pre or post image of one "
+ "findAndModify operation for each "
+ "transaction",
+ !(*imageToWrite));
+ switch (*stmt.getNeedsRetryImage()) {
+ case repl::RetryImageEnum::kPreImage: {
+ invariant(!stmt.getPreImage().isEmpty());
+ *imageToWrite = std::make_pair(repl::RetryImageEnum::kPreImage, stmt.getPreImage());
+ break;
+ }
+ case repl::RetryImageEnum::kPostImage: {
+ invariant(!stmt.getPostImage().isEmpty());
+ *imageToWrite =
+ std::make_pair(repl::RetryImageEnum::kPostImage, stmt.getPostImage());
+ break;
+ }
+ default:
+ MONGO_UNREACHABLE;
+ }
+ };
std::vector<repl::ReplOperation>::iterator stmtIter;
BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd));
@@ -1280,6 +1337,9 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
opsArray.append(stmt.toBSON());
const auto stmtIds = stmt.getStatementIds();
stmtIdsWritten->insert(stmtIdsWritten->end(), stmtIds.begin(), stmtIds.end());
+ if (stmt.getNeedsRetryImage()) {
+ setImageToWrite(stmt);
+ }
}
try {
// BSONArrayBuilder will throw a BSONObjectTooLarge exception if we exceeded the max BSON
@@ -1374,11 +1434,13 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
// skipping over some reserved slots.
//
// The number of oplog entries written is returned.
-int logOplogEntriesForTransaction(OperationContext* opCtx,
- std::vector<repl::ReplOperation>* stmts,
- const std::vector<OplogSlot>& oplogSlots,
- size_t numberOfPrePostImagesToWrite,
- bool prepare) {
+int logOplogEntriesForTransaction(
+ OperationContext* opCtx,
+ std::vector<repl::ReplOperation>* stmts,
+ const std::vector<OplogSlot>& oplogSlots,
+ boost::optional<ImageBundle>* prePostImageToWriteToImageCollection,
+ size_t numberOfPrePostImagesToWrite,
+ bool prepare) {
invariant(!stmts->empty());
invariant(stmts->size() <= oplogSlots.size());
@@ -1398,28 +1460,44 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime();
auto currOplogSlot = oplogSlots.begin();
- // We never want to store pre-images when we're migrating oplog entries from another
- // replica set.
+ // We never want to store pre-images or post-images when we're migrating oplog entries from
+ // another replica set.
const auto& migrationRecipientInfo = repl::tenantMigrationRecipientInfo(opCtx);
- if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) {
- for (auto& statement : *stmts) {
- if (statement.getPreImage().isEmpty()) {
- continue;
- }
+ auto logPrePostImageNoopEntry = [&](const repl::ReplOperation& statement,
+ const BSONObj& imageDoc) {
+ auto slot = *currOplogSlot;
+ ++currOplogSlot;
- auto slot = *currOplogSlot;
- ++currOplogSlot;
+ MutableOplogEntry imageEntry;
+ imageEntry.setOpType(repl::OpTypeEnum::kNoop);
+ imageEntry.setObject(imageDoc);
+ imageEntry.setNss(statement.getNss());
+ imageEntry.setUuid(statement.getUuid());
+ imageEntry.setOpTime(slot);
- MutableOplogEntry preImageEntry;
- preImageEntry.setOpType(repl::OpTypeEnum::kNoop);
- preImageEntry.setObject(statement.getPreImage());
- preImageEntry.setNss(statement.getNss());
- preImageEntry.setUuid(statement.getUuid());
- preImageEntry.setOpTime(slot);
+ return logOperation(opCtx, &imageEntry);
+ };
- auto opTime = logOperation(opCtx, &preImageEntry);
- statement.setPreImageOpTime(opTime);
+ if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) {
+ for (auto& statement : *stmts) {
+ if (!statement.getPreImage().isEmpty() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage) {
+ // Note that 'needsRetryImage' stores the image kind that needs to stored in the
+ // image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the
+ // pre-image will be written to the image collection (after all the applyOps oplog
+ // entries are written).
+ auto opTime = logPrePostImageNoopEntry(statement, statement.getPreImage());
+ statement.setPreImageOpTime(opTime);
+ }
+ if (!statement.getPostImage().isEmpty() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) {
+ // Likewise, when 'needsRetryImage' is equal to kPostImage, the post-image will be
+ // written to the image collection (after all the applyOps oplog entries are
+ // written).
+ auto opTime = logPrePostImageNoopEntry(statement, statement.getPostImage());
+ statement.setPostImageOpTime(opTime);
+ }
}
}
@@ -1432,10 +1510,11 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
// termination condition.
auto stmtsIter = stmts->begin();
while (stmtsIter != stmts->end()) {
-
BSONObjBuilder applyOpsBuilder;
+ boost::optional<std::pair<repl::RetryImageEnum, BSONObj>> imageToWrite;
+
auto nextStmt = packTransactionStatementsForApplyOps(
- &applyOpsBuilder, &stmtIdsWritten, stmtsIter, stmts->end());
+ &applyOpsBuilder, &stmtIdsWritten, &imageToWrite, stmtsIter, stmts->end());
// If we packed the last op, then the next oplog entry we log should be the implicit
// commit or implicit prepare, i.e. we omit the 'partialTxn' field.
@@ -1445,9 +1524,23 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
auto implicitCommit = lastOp && !prepare;
auto implicitPrepare = lastOp && prepare;
auto isPartialTxn = !lastOp;
- if (isPartialTxn) {
- // Partial transactions create multiple oplog entries in the same WriteUnitOfWork.
- // Because of this, partial transactions will set multiple timestamps, violating the
+
+ if (imageToWrite) {
+ // Reserve an oplog slot for potential forged noop oplog entry for the pre-image or
+ // post-image.
+ uassert(6054002,
+ str::stream() << NamespaceString::kConfigImagesNamespace
+ << " can only store the pre or post image of one "
+ "findAndModify operation for each "
+ "transaction",
+ !(*prePostImageToWriteToImageCollection));
+ ++currOplogSlot;
+ }
+
+ if (isPartialTxn || (imageToWrite && !prepare)) {
+ // Partial transactions and unprepared transactions with pre or post image stored in the
+ // image collection create/reserve multiple oplog entries in the same WriteUnitOfWork.
+ // Because of this, such transactions will set multiple timestamps, violating the
// multi timestamp constraint. It's safe to ignore the multi timestamp constraints here
// as additional rollback logic is in place for this case.
opCtx->recoveryUnit()->ignoreAllMultiTimestampConstraints();
@@ -1505,6 +1598,14 @@ int logOplogEntriesForTransaction(OperationContext* opCtx,
hangAfterLoggingApplyOpsForTransaction.pauseWhileSet();
+ if (imageToWrite) {
+ invariant(!(*prePostImageToWriteToImageCollection));
+ *prePostImageToWriteToImageCollection =
+ ImageBundle{imageToWrite->first,
+ imageToWrite->second,
+ prevWriteOpTime.writeOpTime.getTimestamp()};
+ }
+
// Advance the iterator to the beginning of the remaining unpacked statements.
stmtsIter = nextStmt;
numEntriesWritten++;
@@ -1595,8 +1696,17 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
}
// Log in-progress entries for the transaction along with the implicit commit.
+ boost::optional<ImageBundle> imageToWrite;
int numOplogEntries = logOplogEntriesForTransaction(
- opCtx, statements, oplogSlots, numberOfPrePostImagesToWrite, false);
+ opCtx, statements, oplogSlots, &imageToWrite, numberOfPrePostImagesToWrite, false);
+ if (imageToWrite) {
+ writeToImageCollection(opCtx,
+ *opCtx->getLogicalSessionId(),
+ imageToWrite->timestamp,
+ imageToWrite->imageKind,
+ imageToWrite->imageDoc);
+ }
+
commitOpTime = oplogSlots[numOplogEntries - 1];
invariant(!commitOpTime.isNull());
shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime);
@@ -1659,12 +1769,20 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
// will waste the extra slots. The implicit prepare oplog entry will still use
// the last reserved slot, because the transaction participant has already used
// that as the prepare time.
+ boost::optional<ImageBundle> imageToWrite;
logOplogEntriesForTransaction(opCtx,
statements,
reservedSlots,
+ &imageToWrite,
numberOfPrePostImagesToWrite,
true /* prepare */);
-
+ if (imageToWrite) {
+ writeToImageCollection(opCtx,
+ *opCtx->getLogicalSessionId(),
+ imageToWrite->timestamp,
+ imageToWrite->imageKind,
+ imageToWrite->imageDoc);
+ }
} else {
// Log an empty 'prepare' oplog entry.
// We need to have at least one reserved slot.
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 740f24808a5..fbfd560f674 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/read_write_concern_defaults.h"
#include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h"
+#include "mongo/db/repl/apply_ops_command_info.h"
#include "mongo/db/repl/image_collection_entry_gen.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
@@ -71,6 +72,72 @@ namespace {
using repl::OplogEntry;
using unittest::assertGet;
+namespace {
+
+OplogEntry getInnerEntryFromApplyOpsOplogEntry(const OplogEntry& oplogEntry) {
+ std::vector<repl::OplogEntry> innerEntries;
+ ASSERT(oplogEntry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps);
+ repl::ApplyOps::extractOperationsTo(oplogEntry, oplogEntry.getEntry().toBSON(), &innerEntries);
+ ASSERT_EQ(innerEntries.size(), 1u);
+ return innerEntries[0];
+}
+
+void beginRetryableWriteWithTxnNumber(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::unique_ptr<MongoDOperationContextSession>& contextSession) {
+ opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ opCtx->setTxnNumber(txnNumber);
+
+ contextSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(opCtx,
+ {*opCtx->getTxnNumber()},
+ boost::none /* autocommit */,
+ boost::none /* startTransaction */);
+};
+
+void beginNonRetryableTransactionWithTxnNumber(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::unique_ptr<MongoDOperationContextSession>& contextSession) {
+ opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+
+ contextSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */);
+};
+
+void beginRetryableInternalTransactionWithTxnNumber(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::unique_ptr<MongoDOperationContextSession>& contextSession) {
+ RAIIServerParameterControllerForTest controller{"featureFlagInternalTransactions", true};
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+
+ opCtx->setLogicalSessionId(makeLogicalSessionIdWithTxnNumberAndUUIDForTest());
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+
+ contextSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */);
+};
+
+template <typename OpObserverType>
+void commitUnpreparedTransaction(OperationContext* opCtx, OpObserverType& opObserver) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx);
+ opObserver.onUnpreparedTransactionCommit(
+ opCtx, &txnOps, txnParticipant.getNumberOfPrePostImagesToWriteForTest());
+}
+
+} // namespace
+
class OpObserverTest : public ServiceContextMongoDTest {
public:
void setUp() override {
@@ -95,6 +162,10 @@ public:
ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn());
}
+ void tearDown() override {
+ serverGlobalParams.clusterRole = ClusterRole::None;
+ }
+
void reset(OperationContext* opCtx, NamespaceString nss) const {
writeConflictRetry(opCtx, "deleteAll", nss.ns(), [&] {
opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp);
@@ -163,6 +234,11 @@ protected:
return getNOplogEntries(opCtx, 1).back();
}
+ BSONObj getInnerEntryFromSingleApplyOpsOplogEntry(OperationContext* opCtx) {
+ auto applyOpsOplogEntry = assertGet(OplogEntry::parse(getNOplogEntries(opCtx, 1).back()));
+ return getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry).getEntry().toBSON();
+ }
+
bool didWriteImageEntryToSideCollection(OperationContext* opCtx,
const LogicalSessionId& sessionId) {
AutoGetCollection sideCollection(
@@ -739,17 +815,9 @@ public:
void setUp() override {
OpObserverTest::setUp();
_opCtx = cc().makeOperationContext();
-
_opObserver.emplace();
-
MongoDSessionCatalog::onStepUp(opCtx());
_times.emplace(opCtx());
-
- opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
- opCtx()->setTxnNumber(txnNum());
- opCtx()->setInMultiDocumentTransaction();
- _sessionCheckout = std::make_unique<MongoDOperationContextSession>(opCtx());
- _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
}
void tearDown() override {
@@ -760,6 +828,20 @@ public:
OpObserverTest::tearDown();
}
+ void setUpRetryableWrite() {
+ beginRetryableWriteWithTxnNumber(opCtx(), txnNum(), _sessionCheckout);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
+ }
+
+ void setUpNonRetryableTransaction() {
+ beginNonRetryableTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
+ }
+
+ void setUpRetryableInternalTransaction() {
+ beginRetryableInternalTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout);
+ _txnParticipant.emplace(TransactionParticipant::get(opCtx()));
+ }
protected:
Session* session() {
@@ -806,10 +888,7 @@ class OpObserverTransactionTest : public OpObserverTxnParticipantTest {
public:
void setUp() override {
OpObserverTxnParticipantTest::setUp();
- txnParticipant().beginOrContinue(opCtx(),
- {*opCtx()->getTxnNumber()},
- false /* autocommit */,
- true /* startTransaction */);
+ OpObserverTxnParticipantTest::setUpNonRetryableTransaction();
}
protected:
@@ -1498,109 +1577,214 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) {
*/
class OpObserverRetryableFindAndModifyTest : public OpObserverTxnParticipantTest {
public:
+ void tearDown() override {
+ OpObserverTxnParticipantTest::tearDown();
+ }
+
+protected:
+ void testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage() {
+ NamespaceString nss = {"test", "coll"};
+ const auto uuid = CollectionUUID::gen();
+
+ CollectionUpdateArgs updateArgs;
+ updateArgs.stmtIds = {0};
+ updateArgs.updatedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ updateArgs.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs.criteria = BSON("_id" << 0);
+ updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage;
+ OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
+ update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
+
+ WriteUnitOfWork wunit(opCtx());
+ AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
+ opObserver().onUpdate(opCtx(), update);
+ commit();
+
+ // Asserts that only a single oplog entry was created. In essence, we did not create any
+ // no-op image entries in the oplog.
+ const auto oplogEntry = assertGetSingleOplogEntry();
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
+ ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
+ ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
+ "postImage"_sd);
+ }
+
+ void testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage() {
+ NamespaceString nss = {"test", "coll"};
+ const auto uuid = CollectionUUID::gen();
+
+ CollectionUpdateArgs updateArgs;
+ updateArgs.stmtIds = {0};
+ updateArgs.preImageDoc = BSON("_id" << 0 << "data"
+ << "y");
+ updateArgs.update = BSON("$set" << BSON("data"
+ << "x"));
+ updateArgs.criteria = BSON("_id" << 0);
+ updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage;
+ OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
+ update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
+
+ WriteUnitOfWork wunit(opCtx());
+ AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
+ opObserver().onUpdate(opCtx(), update);
+ commit();
+
+ // Asserts that only a single oplog entry was created. In essence, we did not create any
+ // no-op image entries in the oplog.
+ const auto oplogEntry = assertGetSingleOplogEntry();
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
+ ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
+ ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
+ "preImage"_sd);
+ }
+
+ void testRetryableFindAndModifyDeleteHasNeedsRetryImage() {
+ NamespaceString nss = {"test", "coll"};
+ const auto uuid = CollectionUUID::gen();
+
+ WriteUnitOfWork wunit(opCtx());
+ AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
+ const auto deletedDoc = BSON("_id" << 0 << "data"
+ << "x");
+ opObserver().aboutToDelete(opCtx(), nss, uuid, deletedDoc);
+ OplogDeleteEntryArgs args;
+ args.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
+ args.deletedDoc = &deletedDoc;
+ opObserver().onDelete(opCtx(), nss, uuid, 0, args);
+ commit();
+
+ // Asserts that only a single oplog entry was created. In essence, we did not create any
+ // no-op image entries in the oplog.
+ const auto oplogEntry = assertGetSingleOplogEntry();
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
+ ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
+ ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
+ ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
+ "preImage"_sd);
+ }
+
+ virtual void commit() = 0;
+
+ virtual BSONObj assertGetSingleOplogEntry() = 0;
+};
+
+class OpObserverRetryableFindAndModifyOutsideTransactionTest
+ : public OpObserverRetryableFindAndModifyTest {
+public:
void setUp() override {
OpObserverTxnParticipantTest::setUp();
- txnParticipant().beginOrContinue(
- opCtx(), {txnNum()}, boost::none /* autocommit */, boost::none /* startTransaction */);
+ OpObserverTxnParticipantTest::setUpRetryableWrite();
}
- void tearDown() override {
- OpObserverTxnParticipantTest::tearDown();
+protected:
+ void commit() final{};
+
+ BSONObj assertGetSingleOplogEntry() final {
+ return getSingleOplogEntry(opCtx());
}
};
-TEST_F(OpObserverRetryableFindAndModifyTest,
+TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest,
RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) {
- NamespaceString nss = {"test", "coll"};
- const auto uuid = CollectionUUID::gen();
+ testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage();
+}
- CollectionUpdateArgs updateArgs;
- updateArgs.stmtIds = {0};
- updateArgs.updatedDoc = BSON("_id" << 0 << "data"
- << "x");
- updateArgs.update = BSON("$set" << BSON("data"
- << "x"));
- updateArgs.criteria = BSON("_id" << 0);
- updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage;
- OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
- update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
-
- WriteUnitOfWork wunit(opCtx());
- AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
- opObserver().onUpdate(opCtx(), update);
- // Asserts that only a single oplog entry was created. In essence, we did not create any
- // no-op image entries in the oplog.
- const auto oplogEntry = getSingleOplogEntry(opCtx());
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
- ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
- ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
- "postImage"_sd);
-}
-
-TEST_F(OpObserverRetryableFindAndModifyTest,
+TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest,
RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) {
- NamespaceString nss = {"test", "coll"};
- const auto uuid = CollectionUUID::gen();
+ testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage();
+}
- CollectionUpdateArgs updateArgs;
- updateArgs.stmtIds = {0};
- updateArgs.preImageDoc = BSON("_id" << 0 << "data"
- << "y");
- updateArgs.update = BSON("$set" << BSON("data"
- << "x"));
- updateArgs.criteria = BSON("_id" << 0);
- updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage;
- OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
- update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
-
- WriteUnitOfWork wunit(opCtx());
- AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
- opObserver().onUpdate(opCtx(), update);
- // Asserts that only a single oplog entry was created. In essence, we did not create any
- // no-op image entries in the oplog.
- const auto oplogEntry = getSingleOplogEntry(opCtx());
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
- ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
- ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
- "preImage"_sd);
-}
-
-TEST_F(OpObserverRetryableFindAndModifyTest, RetryableFindAndModifyDeleteHasNeedsRetryImage) {
- NamespaceString nss = {"test", "coll"};
- const auto uuid = CollectionUUID::gen();
+TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest,
+ RetryableFindAndModifyDeleteHasNeedsRetryImage) {
+ testRetryableFindAndModifyDeleteHasNeedsRetryImage();
+}
- WriteUnitOfWork wunit(opCtx());
- AutoGetDb autoDb(opCtx(), nss.db(), MODE_X);
- const auto deletedDoc = BSON("_id" << 0 << "data"
- << "x");
- opObserver().aboutToDelete(opCtx(), nss, uuid, deletedDoc);
- OplogDeleteEntryArgs args;
- args.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection;
- args.deletedDoc = &deletedDoc;
- opObserver().onDelete(opCtx(), nss, uuid, 0, args);
- // Asserts that only a single oplog entry was created. In essence, we did not create any
- // no-op image entries in the oplog.
- const auto oplogEntry = getSingleOplogEntry(opCtx());
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName));
- ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName));
- ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName));
- ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName),
- "preImage"_sd);
-}
-
-OplogEntry findByTimestamp(const std::vector<BSONObj>& oplogs, Timestamp ts) {
+class OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest
+ : public OpObserverRetryableFindAndModifyTest {
+public:
+ void setUp() override {
+ OpObserverTxnParticipantTest::setUp();
+ OpObserverTxnParticipantTest::setUpRetryableInternalTransaction();
+ }
+
+protected:
+ void commit() final {
+ commitUnpreparedTransaction<OpObserverImpl>(opCtx(), opObserver());
+ };
+
+ BSONObj assertGetSingleOplogEntry() final {
+ return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx());
+ }
+};
+
+TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) {
+ testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) {
+ testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyDeleteHasNeedsRetryImage) {
+ testRetryableFindAndModifyDeleteHasNeedsRetryImage();
+}
+
+class OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest
+ : public OpObserverRetryableFindAndModifyTest {
+public:
+ void setUp() override {
+ serverGlobalParams.clusterRole = ClusterRole::ShardServer;
+ OpObserverTxnParticipantTest::setUp();
+ OpObserverTxnParticipantTest::setUpRetryableInternalTransaction();
+ }
+
+protected:
+ void commit() final {
+ const auto prepareSlot = repl::getNextOpTime(opCtx());
+ txnParticipant().transitionToPreparedforTest(opCtx(), prepareSlot);
+ auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx());
+ opObserver().onTransactionPrepare(
+ opCtx(),
+ {prepareSlot},
+ &txnOps,
+ txnParticipant().getNumberOfPrePostImagesToWriteForTest());
+ };
+
+ BSONObj assertGetSingleOplogEntry() final {
+ return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx());
+ }
+};
+
+TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) {
+ testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) {
+ testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage();
+}
+
+TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest,
+ RetryableFindAndModifyDeleteHasNeedsRetryImage) {
+ testRetryableFindAndModifyDeleteHasNeedsRetryImage();
+}
+
+boost::optional<OplogEntry> findByTimestamp(const std::vector<BSONObj>& oplogs, Timestamp ts) {
for (auto& oplog : oplogs) {
const auto& entry = assertGet(OplogEntry::parse(oplog));
if (entry.getTimestamp() == ts) {
return entry;
}
}
-
- FAIL("Not found.");
- // C++/clang isn't smart enough to know FAIL is guaranteed to throw.
- MONGO_UNREACHABLE;
+ return boost::none;
}
using StoreDocOption = CollectionUpdateArgs::StoreDocOption;
@@ -1619,6 +1803,8 @@ const bool kChangeStreamImagesDisabled = false;
const auto kNotRetryable = RetryableFindAndModifyLocation::kNone;
const auto kRecordInOplog = RetryableFindAndModifyLocation::kOplog;
const auto kRecordInSideCollection = RetryableFindAndModifyLocation::kSideCollection;
+
+const std::vector<bool> kInMultiDocumentTransactionCases{false, true};
} // namespace
struct UpdateTestCase {
@@ -1662,48 +1848,9 @@ struct UpdateTestCase {
}
};
-TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
- // Create a registry that only registers the Impl. It can be challenging to call methods on the
- // Impl directly. It falls into cases where `ReservedTimes` is expected to be instantiated. Due
- // to strong encapsulation, we use the registry that managers the `ReservedTimes` on our behalf.
- OpObserverRegistry opObserver;
- opObserver.addObserver(std::make_unique<OpObserverImpl>());
-
- NamespaceString nss("test", "coll");
- CollectionUUID uuid = CollectionUUID::gen();
-
- std::vector<UpdateTestCase> cases = {
- // Regular updates.
- {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
- {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
- {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 1},
- {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
- {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
- {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
- // FindAndModify asking for a preImage.
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
- {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
- {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
- {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
- // FindAndModify asking for a postImage.
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
- {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
- {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
- {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 3},
- {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
-
- for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) {
- const auto& testCase = cases[testIdx];
+class OnUpdateOutputsTest : public OpObserverTest {
+protected:
+ void logTestCase(const UpdateTestCase& testCase) {
LOGV2(5739902,
"UpdateTestCase",
"ImageType"_attr = testCase.getImageTypeStr(),
@@ -1712,98 +1859,87 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
"RetryableFindAndModifyLocation"_attr =
testCase.getRetryableFindAndModifyLocationStr(),
"ExpectedOplogEntries"_attr = testCase.numOutputOplogs);
+ }
- CollectionUpdateArgs updateArgs;
- updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
- updateArgs.changeStreamPreAndPostImagesEnabledForCollection =
+ void initializeOplogUpdateEntryArgs(OperationContext* opCtx,
+ const UpdateTestCase& testCase,
+ OplogUpdateEntryArgs* update) {
+ update->updateArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
+ update->updateArgs->changeStreamPreAndPostImagesEnabledForCollection =
testCase.changeStreamImagesEnabled;
- auto opCtxRaii = cc().makeOperationContext();
- OperationContext* opCtx = opCtxRaii.get();
- // Phase 1: Clearing any state and setting up fixtures/the update call.
- resetOplogAndTransactions(opCtx);
-
- OplogUpdateEntryArgs update(&updateArgs, nss, uuid);
- boost::optional<MongoDOperationContextSession> contextSession;
- boost::optional<TransactionParticipant::Participant> txnParticipant;
switch (testCase.retryableOptions) {
case kNotRetryable:
- updateArgs.stmtIds = {kUninitializedStmtId};
+ update->updateArgs->stmtIds = {kUninitializedStmtId};
break;
case kRecordInOplog:
- update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kOplog;
- updateArgs.stmtIds = {1};
+ update->retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kOplog;
+ update->updateArgs->stmtIds = {1};
break;
case kRecordInSideCollection:
- update.retryableFindAndModifyLocation =
+ update->retryableFindAndModifyLocation =
RetryableFindAndModifyLocation::kSideCollection;
- updateArgs.stmtIds = {1};
- if (testCase.alwaysRecordPreImages &&
- testCase.retryableOptions == kRecordInSideCollection) {
+ update->updateArgs->stmtIds = {1};
+ if (testCase.retryableOptions == kRecordInSideCollection) {
// 'getNextOpTimes' requires us to be inside a WUOW when reserving oplog slots.
WriteUnitOfWork wuow(opCtx);
auto reservedSlots = repl::getNextOpTimes(opCtx, 3);
- updateArgs.oplogSlots = reservedSlots;
+ update->updateArgs->oplogSlots = reservedSlots;
}
break;
}
- if (testCase.retryableOptions != kNotRetryable) {
- opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
- opCtx->setTxnNumber(TxnNumber(testIdx));
- contextSession.emplace(opCtx);
- txnParticipant.emplace(TransactionParticipant::get(opCtx));
- txnParticipant->beginOrContinue(opCtx,
- {TxnNumber(testIdx)},
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
- }
-
- updateArgs.preImageDoc = boost::none;
+ update->updateArgs->preImageDoc = boost::none;
if (testCase.imageType == StoreDocOption::PreImage || testCase.alwaysRecordPreImages ||
testCase.changeStreamImagesEnabled) {
- updateArgs.preImageDoc = BSON("_id" << 0 << "preImage" << true);
+ update->updateArgs->preImageDoc = BSON("_id" << 0 << "preImage" << true);
}
-
- updateArgs.updatedDoc = BSON("_id" << 0 << "postImage" << true);
- updateArgs.update =
+ update->updateArgs->updatedDoc = BSON("_id" << 0 << "postImage" << true);
+ update->updateArgs->update =
BSON("$set" << BSON("postImage" << true) << "$unset" << BSON("preImage" << 1));
- updateArgs.criteria = BSON("_id" << 0);
- updateArgs.storeDocOption = testCase.imageType;
-
- // Phase 2: Call the code we're testing.
- WriteUnitOfWork wuow(opCtx);
- AutoGetCollection locks(opCtx, nss, LockMode::MODE_IX);
- opObserver.onUpdate(opCtx, update);
- wuow.commit();
-
- // Phase 3: Analyze the results:
-
- // This `getNOplogEntries` also asserts that all oplogs are retrieved.
- std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
- // Entries are returned in ascending timestamp order.
- const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back()));
+ update->updateArgs->criteria = BSON("_id" << 0);
+ update->updateArgs->storeDocOption = testCase.imageType;
+ }
+ void checkPreImageInOplogIfNeeded(const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& updateOplogEntry) {
const bool checkPreImageInOplog = testCase.alwaysRecordPreImages ||
(testCase.imageType == StoreDocOption::PreImage &&
testCase.retryableOptions == kRecordInOplog);
if (checkPreImageInOplog) {
- ASSERT(actualOp.getPreImageOpTime());
- const Timestamp preImageOpTime = actualOp.getPreImageOpTime()->getTimestamp();
+ ASSERT(updateOplogEntry.getPreImageOpTime());
+ const Timestamp preImageOpTime = updateOplogEntry.getPreImageOpTime()->getTimestamp();
ASSERT_FALSE(preImageOpTime.isNull());
- OplogEntry preImage = findByTimestamp(oplogs, preImageOpTime);
+ OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime);
ASSERT_BSONOBJ_EQ(update.updateArgs->preImageDoc.get(), preImage.getObject());
+ } else {
+ ASSERT_FALSE(updateOplogEntry.getPreImageOpTime());
}
+ }
+ void checkPostImageInOplogIfNeeded(const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& updateOplogEntry) {
const bool checkPostImageInOplog = testCase.imageType == StoreDocOption::PostImage &&
testCase.retryableOptions == kRecordInOplog;
if (checkPostImageInOplog) {
- ASSERT(actualOp.getPostImageOpTime());
- const Timestamp postImageOpTime = actualOp.getPostImageOpTime()->getTimestamp();
+ ASSERT(updateOplogEntry.getPostImageOpTime());
+ const Timestamp postImageOpTime = updateOplogEntry.getPostImageOpTime()->getTimestamp();
ASSERT_FALSE(postImageOpTime.isNull());
- OplogEntry postImage = findByTimestamp(oplogs, postImageOpTime);
+ OplogEntry postImage = *findByTimestamp(oplogs, postImageOpTime);
ASSERT_BSONOBJ_EQ(update.updateArgs->updatedDoc, postImage.getObject());
+ } else {
+ ASSERT_FALSE(updateOplogEntry.getPostImageOpTime());
}
+ }
+ void checkSideCollectionIfNeeded(OperationContext* opCtx,
+ const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& updateOplogEntry) {
bool checkSideCollection =
testCase.isFindAndModify() && testCase.retryableOptions == kRecordInSideCollection;
if (checkSideCollection && testCase.alwaysRecordPreImages &&
@@ -1813,32 +1949,180 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) {
// in the side collection.
checkSideCollection = false;
}
-
if (checkSideCollection) {
repl::ImageEntry imageEntry =
- getImageEntryFromSideCollection(opCtx, *actualOp.getSessionId());
+ getImageEntryFromSideCollection(opCtx, *updateOplogEntry.getSessionId());
const BSONObj& expectedImage = testCase.imageType == StoreDocOption::PreImage
? update.updateArgs->preImageDoc.get()
: update.updateArgs->updatedDoc;
ASSERT_BSONOBJ_EQ(expectedImage, imageEntry.getImage());
+ ASSERT(imageEntry.getImageKind() == updateOplogEntry.getNeedsRetryImage());
if (testCase.imageType == StoreDocOption::PreImage) {
ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage);
} else {
ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPostImage);
}
+
+ // If 'updateOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged
+ // noop oplog entry for the pre/postImage written to the side collection.
+ const Timestamp forgeNoopTimestamp = updateOplogEntry.getTimestamp() - 1;
+ ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp));
+ } else {
+ ASSERT_FALSE(updateOplogEntry.getNeedsRetryImage());
+ if (updateOplogEntry.getSessionId()) {
+ ASSERT_FALSE(
+ didWriteImageEntryToSideCollection(opCtx, *updateOplogEntry.getSessionId()));
+ } else {
+ // Session id is missing only for non-retryable option.
+ ASSERT(testCase.retryableOptions == kNotRetryable);
+ }
}
+ }
+ void checkChangeStreamImagesIfNeeded(OperationContext* opCtx,
+ const UpdateTestCase& testCase,
+ const OplogUpdateEntryArgs& update,
+ const OplogEntry& updateOplogEntry) {
if (testCase.changeStreamImagesEnabled) {
BSONObj container;
- ChangeStreamPreImageId preImageId(uuid, actualOp.getOpTime().getTimestamp(), 0);
+ ChangeStreamPreImageId preImageId(
+ _uuid, updateOplogEntry.getOpTime().getTimestamp(), 0);
ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container);
const BSONObj& expectedImage = update.updateArgs->preImageDoc.get();
ASSERT_BSONOBJ_EQ(expectedImage, preImage.getPreImage());
- ASSERT_EQ(actualOp.getWallClockTime(), preImage.getOperationTime());
+ ASSERT_EQ(updateOplogEntry.getWallClockTime(), preImage.getOperationTime());
+ }
+ }
+
+ std::vector<UpdateTestCase> _cases = {
+ // Regular updates.
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 1},
+ {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
+ // FindAndModify asking for a preImage.
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
+ {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2},
+ // FindAndModify asking for a postImage.
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
+ {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 3},
+ {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
+
+ const NamespaceString _nss{"test", "coll"};
+ const CollectionUUID _uuid = CollectionUUID::gen();
+};
+
+TEST_F(OnUpdateOutputsTest, TestNonTransactionFundamentalOnUpdateOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the update call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession);
}
+
+ // Phase 2: Call the code we're testing.
+ CollectionUpdateArgs updateArgs;
+ OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid);
+ initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs);
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ opObserver.onUpdate(opCtx, updateEntryArgs);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto updateOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ checkPreImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkPostImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkChangeStreamImagesIfNeeded(opCtx, testCase, updateEntryArgs, updateOplogEntry);
}
}
+TEST_F(OnUpdateOutputsTest, TestFundamentalTransactionOnUpdateOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ if (testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) {
+ continue;
+ }
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the update call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ } else {
+ beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ CollectionUpdateArgs updateArgs;
+ OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid);
+ initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs);
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ opObserver.onUpdate(opCtx, updateEntryArgs);
+ commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ auto updateOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry);
+ checkPreImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkPostImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry);
+ }
+}
struct InsertTestCase {
bool isRetryableWrite;
@@ -1882,17 +2166,9 @@ TEST_F(OpObserverTest, TestFundamentalOnInsertsOutputs) {
toInsert.emplace_back(stmtId, BSON("_id" << stmtIdx));
}
- boost::optional<MongoDOperationContextSession> contextSession;
- boost::optional<TransactionParticipant::Participant> txnParticipant;
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
if (testCase.isRetryableWrite) {
- opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
- opCtx->setTxnNumber(TxnNumber(testIdx));
- contextSession.emplace(opCtx);
- txnParticipant.emplace(TransactionParticipant::get(opCtx));
- txnParticipant->beginOrContinue(opCtx,
- {TxnNumber(testIdx)},
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
+ beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession);
}
// Phase 2: Call the code we're testing.
@@ -1971,31 +2247,10 @@ struct DeleteTestCase {
}
};
-TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) {
- // Create a registry that only registers the Impl. It can be challenging to call methods on the
- // Impl directly. It falls into cases where `ReservedTimes` is expected to be instantiated. Due
- // to strong encapsulation, we use the registry that managers the `ReservedTimes` on our behalf.
- OpObserverRegistry opObserver;
- opObserver.addObserver(std::make_unique<OpObserverImpl>());
-
- NamespaceString nss("test", "coll");
- CollectionUUID uuid = CollectionUUID::gen();
-
- // For the DeleteTestCase, we add a "pre-image" deletedDoc when using `kRecordInOplog` and
- // `kRecordInSideCollection`.
- std::vector<DeleteTestCase> cases{
- {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
- {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
- {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
- {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
- {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
- {kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
- {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
- {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
+class OnDeleteOutputsTest : public OpObserverTest {
- for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) {
- const auto& testCase = cases[testIdx];
+protected:
+ void logTestCase(const DeleteTestCase& testCase) {
LOGV2(5739905,
"DeleteTestCase",
"PreImageRecording"_attr = testCase.alwaysRecordPreImages,
@@ -2003,111 +2258,214 @@ TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) {
"RetryableFindAndModifyLocation"_attr =
testCase.getRetryableFindAndModifyLocationStr(),
"ExpectedOplogEntries"_attr = testCase.numOutputOplogs);
+ }
- OplogDeleteEntryArgs deleteArgs;
- deleteArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
- deleteArgs.changeStreamPreAndPostImagesEnabledForCollection =
+ void initializeOplogDeleteEntryArgs(OperationContext* opCtx,
+ const DeleteTestCase& testCase,
+ OplogDeleteEntryArgs* deleteArgs) {
+ deleteArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages;
+ deleteArgs->changeStreamPreAndPostImagesEnabledForCollection =
testCase.changeStreamImagesEnabled;
- auto opCtxRaii = cc().makeOperationContext();
- OperationContext* opCtx = opCtxRaii.get();
- // Phase 1: Clearing any state and setting up fixtures/the update call.
- resetOplogAndTransactions(opCtx);
-
- boost::optional<MongoDOperationContextSession> contextSession;
- boost::optional<TransactionParticipant::Participant> txnParticipant;
- if (testCase.retryableOptions != kNotRetryable) {
- opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
- opCtx->setTxnNumber(TxnNumber(testIdx));
- contextSession.emplace(opCtx);
- txnParticipant.emplace(TransactionParticipant::get(opCtx));
- txnParticipant->beginOrContinue(opCtx,
- {TxnNumber(testIdx)},
- boost::none /* autocommit */,
- boost::none /* startTransaction */);
- }
switch (testCase.retryableOptions) {
case kNotRetryable:
- deleteArgs.retryableFindAndModifyLocation = kNotRetryable;
+ deleteArgs->retryableFindAndModifyLocation = kNotRetryable;
break;
case kRecordInOplog:
- deleteArgs.retryableFindAndModifyLocation = kRecordInOplog;
+ deleteArgs->retryableFindAndModifyLocation = kRecordInOplog;
break;
case kRecordInSideCollection:
- deleteArgs.retryableFindAndModifyLocation = kRecordInSideCollection;
+ deleteArgs->retryableFindAndModifyLocation = kRecordInSideCollection;
break;
}
-
- const BSONObj deletedDoc = BSON("_id" << 0 << "valuePriorToDelete"
- << "marvelous");
if (testCase.isRetryable() || testCase.alwaysRecordPreImages ||
testCase.changeStreamImagesEnabled) {
- deleteArgs.deletedDoc = &deletedDoc;
+ deleteArgs->deletedDoc = &_deletedDoc;
}
- // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
- // of setting of `documentKey` on the delete for sharding purposes.
- // `OpObserverImpl::onDelete` asserts its existence.
- documentKeyDecoration(opCtx).emplace(deletedDoc["_id"].wrap(), boost::none);
- StmtId deleteStmtId = kUninitializedStmtId;
- if (testCase.isRetryable()) {
- deleteStmtId = {1};
- }
-
- // Phase 2: Call the code we're testing.
- WriteUnitOfWork wuow(opCtx);
- AutoGetCollection locks(opCtx, nss, LockMode::MODE_IX);
- opObserver.onDelete(opCtx, nss, uuid, deleteStmtId, deleteArgs);
- wuow.commit();
-
- // Phase 3: Analyze the results:
-
- // This `getNOplogEntries` also asserts that all oplogs are retrieved.
- std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
- // Entries are returned in ascending timestamp order.
- const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back()));
+ }
+ void checkPreImageInOplogIfNeeded(const DeleteTestCase& testCase,
+ const OplogDeleteEntryArgs& deleteArgs,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& deleteOplogEntry) {
const bool checkPreImageInOplog = deleteArgs.preImageRecordingEnabledForCollection ||
deleteArgs.retryableFindAndModifyLocation == kRecordInOplog;
if (checkPreImageInOplog) {
- ASSERT(actualOp.getPreImageOpTime());
- const Timestamp preImageOpTime = actualOp.getPreImageOpTime()->getTimestamp();
+ ASSERT(deleteOplogEntry.getPreImageOpTime());
+ const Timestamp preImageOpTime = deleteOplogEntry.getPreImageOpTime()->getTimestamp();
ASSERT_FALSE(preImageOpTime.isNull());
- OplogEntry preImage = findByTimestamp(oplogs, preImageOpTime);
- ASSERT_BSONOBJ_EQ(deletedDoc, preImage.getObject());
+ OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime);
+ ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getObject());
} else {
- ASSERT_FALSE(actualOp.getPreImageOpTime());
+ ASSERT_FALSE(deleteOplogEntry.getPreImageOpTime());
}
+ }
+ void checkSideCollectionIfNeeded(OperationContext* opCtx,
+ const DeleteTestCase& testCase,
+ const OplogDeleteEntryArgs& deleteArgs,
+ const std::vector<BSONObj>& oplogs,
+ const OplogEntry& deleteOplogEntry) {
bool didWriteInSideCollection =
deleteArgs.retryableFindAndModifyLocation == kRecordInSideCollection &&
!deleteArgs.preImageRecordingEnabledForCollection;
if (didWriteInSideCollection) {
repl::ImageEntry imageEntry =
- getImageEntryFromSideCollection(opCtx, *actualOp.getSessionId());
+ getImageEntryFromSideCollection(opCtx, *deleteOplogEntry.getSessionId());
+ ASSERT(imageEntry.getImageKind() == deleteOplogEntry.getNeedsRetryImage());
ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage);
- ASSERT_BSONOBJ_EQ(deletedDoc, imageEntry.getImage());
+ ASSERT_BSONOBJ_EQ(_deletedDoc, imageEntry.getImage());
+
+ // If 'deleteOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged
+ // noop oplog entry for the preImage written to the side collection.
+ const Timestamp forgeNoopTimestamp = deleteOplogEntry.getTimestamp() - 1;
+ ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp));
} else {
- if (actualOp.getSessionId()) {
- ASSERT_FALSE(didWriteImageEntryToSideCollection(opCtx, *actualOp.getSessionId()));
+ ASSERT_FALSE(deleteOplogEntry.getNeedsRetryImage());
+ if (deleteOplogEntry.getSessionId()) {
+ ASSERT_FALSE(
+ didWriteImageEntryToSideCollection(opCtx, *deleteOplogEntry.getSessionId()));
} else {
// Session id is missing only for non-retryable option.
ASSERT(testCase.retryableOptions == kNotRetryable);
}
}
+ }
- const Timestamp preImageOpTime = actualOp.getOpTime().getTimestamp();
- ChangeStreamPreImageId preImageId(uuid, preImageOpTime, 0);
+ void checkChangeStreamImagesIfNeeded(OperationContext* opCtx,
+ const DeleteTestCase& testCase,
+ const OplogDeleteEntryArgs& deleteArgs,
+ const OplogEntry& deleteOplogEntry) {
+ const Timestamp preImageOpTime = deleteOplogEntry.getOpTime().getTimestamp();
+ ChangeStreamPreImageId preImageId(_uuid, preImageOpTime, 0);
if (deleteArgs.changeStreamPreAndPostImagesEnabledForCollection) {
BSONObj container;
ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container);
- ASSERT_BSONOBJ_EQ(deletedDoc, preImage.getPreImage());
- ASSERT_EQ(actualOp.getWallClockTime(), preImage.getOperationTime());
+ ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getPreImage());
+ ASSERT_EQ(deleteOplogEntry.getWallClockTime(), preImage.getOperationTime());
} else {
ASSERT_FALSE(didWriteDeletedDocToPreImagesCollection(opCtx, preImageId));
}
}
+
+ std::vector<DeleteTestCase> _cases{
+ {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1},
+ {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1},
+ {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1},
+ {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2},
+ {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1},
+ {kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2},
+ {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2},
+ {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}};
+
+ const NamespaceString _nss{"test", "coll"};
+ const CollectionUUID _uuid = CollectionUUID::gen();
+ const BSONObj _deletedDoc = BSON("_id" << 0 << "valuePriorToDelete"
+ << "marvelous");
+};
+
+TEST_F(OnDeleteOutputsTest, TestNonTransactionFundamentalOnDeleteOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the delete call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ OplogDeleteEntryArgs deleteEntryArgs;
+ initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs);
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none);
+ opObserver.onDelete(
+ opCtx, _nss, _uuid, testCase.isRetryable() ? 1 : kUninitializedStmtId, deleteEntryArgs);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto deleteOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ checkPreImageInOplogIfNeeded(testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ checkChangeStreamImagesIfNeeded(opCtx, testCase, deleteEntryArgs, deleteOplogEntry);
+ }
}
+TEST_F(OnDeleteOutputsTest, TestTransactionFundamentalOnDeleteOutputs) {
+ // Create a registry that only registers the Impl. It can be challenging to call methods on
+ // the Impl directly. It falls into cases where `ReservedTimes` is expected to be
+ // instantiated. Due to strong encapsulation, we use the registry that managers the
+ // `ReservedTimes` on our behalf.
+ OpObserverRegistry opObserver;
+ opObserver.addObserver(std::make_unique<OpObserverImpl>());
+
+ for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) {
+ const auto& testCase = _cases[testIdx];
+ if (testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) {
+ continue;
+ }
+ logTestCase(testCase);
+
+ auto opCtxRaii = cc().makeOperationContext();
+ OperationContext* opCtx = opCtxRaii.get();
+
+ // Phase 1: Clearing any state and setting up fixtures/the delete call.
+ resetOplogAndTransactions(opCtx);
+
+ std::unique_ptr<MongoDOperationContextSession> contextSession;
+ if (testCase.isRetryable()) {
+ beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ } else {
+ beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession);
+ }
+
+ // Phase 2: Call the code we're testing.
+ OplogDeleteEntryArgs deleteEntryArgs;
+ initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs);
+ const auto stmtId = testCase.isRetryable() ? 1 : kUninitializedStmtId;
+
+ WriteUnitOfWork wuow(opCtx);
+ AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX);
+ // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect
+ // of setting of `documentKey` on the delete for sharding purposes.
+ // `OpObserverImpl::onDelete` asserts its existence.
+ documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none);
+ opObserver.onDelete(opCtx, _nss, _uuid, stmtId, deleteEntryArgs);
+ commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver);
+ wuow.commit();
+
+ // Phase 3: Analyze the results:
+ // This `getNOplogEntries` also asserts that all oplogs are retrieved.
+ std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs);
+ // Entries are returned in ascending timestamp order.
+ auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back()));
+ auto deleteOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry);
+ checkPreImageInOplogIfNeeded(testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ checkSideCollectionIfNeeded(opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry);
+ }
+}
TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
const NamespaceString nss1("testDB", "testColl");
diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl
index 2254f4fcb58..415b50f9ea1 100644
--- a/src/mongo/db/ops/write_ops.idl
+++ b/src/mongo/db/ops/write_ops.idl
@@ -417,6 +417,10 @@ commands:
description: "When true, returns the modified document rather than the original."
type: safeBool
optional: true
+ stmtId:
+ description: "The statement number for this findAndModify operation."
+ type: int
+ optional: true
bypassDocumentValidation:
description: "Enables the operation to bypass document validation. This lets you
write documents that do not meet the validation requirements."
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 0bffa5e01b3..7ee378a0f9d 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -1550,17 +1550,18 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
if (op.getNeedsRetryImage()) {
- writeToImageCollection(opCtx,
- op.getSessionId().get(),
- op.getTxnNumber().get(),
- op.getTimestamp(),
- op.getNeedsRetryImage().get(),
- // If we did not request an image because we're in
- // initial sync, the value passed in here is conveniently
- // the empty BSONObj.
- ur.requestedDocImage,
- getInvalidatingReason(mode, isDataConsistent),
- &upsertConfigImage);
+ writeToImageCollection(
+ opCtx,
+ op.getSessionId().get(),
+ op.getTxnNumber().get(),
+ op.getTimestampForRetryImage().value_or(op.getTimestamp()),
+ op.getNeedsRetryImage().get(),
+ // If we did not request an image because we're in
+ // initial sync, the value passed in here is conveniently
+ // the empty BSONObj.
+ ur.requestedDocImage,
+ getInvalidatingReason(mode, isDataConsistent),
+ &upsertConfigImage);
}
wuow.commit();
@@ -1625,14 +1626,15 @@ Status applyOperation_inlock(OperationContext* opCtx,
// isn't strictly necessary for correctness -- the `config.transactions` table
// is responsible for whether to retry. The motivation here is to simply reduce
// the number of states related documents in the two collections can be in.
- writeToImageCollection(opCtx,
- op.getSessionId().get(),
- op.getTxnNumber().get(),
- op.getTimestamp(),
- repl::RetryImageEnum::kPreImage,
- result.requestedPreImage.value_or(BSONObj()),
- getInvalidatingReason(mode, isDataConsistent),
- &upsertConfigImage);
+ writeToImageCollection(
+ opCtx,
+ op.getSessionId().get(),
+ op.getTxnNumber().get(),
+ op.getTimestampForRetryImage().value_or(op.getTimestamp()),
+ repl::RetryImageEnum::kPreImage,
+ result.requestedPreImage.value_or(BSONObj()),
+ getInvalidatingReason(mode, isDataConsistent),
+ &upsertConfigImage);
}
if (result.nDeleted == 0 && mode == OplogApplication::Mode::kSecondary) {
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index 90156b855b1..ce141a2b987 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -85,6 +85,12 @@ public:
}
void setPreImage(BSONObj value) {
+ if (!_fullPreImage.isEmpty()) {
+ uassert(6054003,
+ "Cannot set pre-image more than once",
+ _fullPreImage.woCompare(value) == 0);
+ return;
+ }
_fullPreImage = std::move(value);
}
@@ -93,6 +99,12 @@ public:
}
void setPostImage(BSONObj value) {
+ if (!_fullPostImage.isEmpty()) {
+ uassert(6054004,
+ "Cannot set post-image more than once",
+ _fullPostImage.woCompare(value) == 0);
+ return;
+ }
_fullPostImage = std::move(value);
}
@@ -117,6 +129,9 @@ public:
private:
BSONObj _preImageDocumentKey;
+
+ // Used for storing the pre-image and post-image for the operation in-memory regardless of where
+ // the images should be persisted.
BSONObj _fullPreImage;
BSONObj _fullPostImage;
};
@@ -563,6 +578,14 @@ public:
void setPostImageOp(std::shared_ptr<DurableOplogEntry> postImageOp);
void setPostImageOp(const BSONObj& postImageOp);
+ void setTimestampForRetryImage(Timestamp value) & {
+ _timestampForRetryImage = std::move(value);
+ }
+
+ boost::optional<Timestamp> getTimestampForRetryImage() const {
+ return _timestampForRetryImage;
+ }
+
std::string toStringForLogging() const;
/**
@@ -624,6 +647,17 @@ private:
std::shared_ptr<DurableOplogEntry> _postImageOp;
bool _isForCappedCollection = false;
+
+ // During oplog application on secondaries, oplog entries extracted from each applyOps oplog
+ // entry for a transaction are given the timestamp of the terminal applyOps oplog entry.
+ // Similarly, during oplog replay, oplog entries extracted from each applyOps oplog entry for
+ // a transaction are given the timestamp of the commit oplog entry. As a result, some of those
+ // oplog entries may have timestamp that is not equal to the timestamp of applyOps oplog entry
+ // that they corresponds to, and it is incorrect to use that timestamp when writing image
+ // collection entries. As such, during transaction oplog application, _timestampForRetryImage
+ // will be used to store the timestamp of the applyOps oplog entry that this operation
+ // actually corresponds to if an image collection entry is expected to be written.
+ boost::optional<Timestamp> _timestampForRetryImage = boost::none;
};
std::ostream& operator<<(std::ostream& s, const DurableOplogEntry& o);
diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp
index d2fb7f0a178..a838e8566f6 100644
--- a/src/mongo/db/repl/transaction_oplog_application.cpp
+++ b/src/mongo/db/repl/transaction_oplog_application.cpp
@@ -319,6 +319,12 @@ std::pair<std::vector<OplogEntry>, bool> _readTransactionOperationsFromOplogChai
invariant(operationEntry.isPartialTransaction());
auto prevOpsEnd = ops.size();
repl::ApplyOps::extractOperationsTo(operationEntry, lastEntryInTxnObj, &ops);
+ for (auto opIter = ops.begin() + prevOpsEnd; opIter != ops.end(); ++opIter) {
+ auto& op = *opIter;
+ if (op.getNeedsRetryImage()) {
+ op.setTimestampForRetryImage(operationEntry.getTimestamp());
+ }
+ }
// Because BSONArrays do not have fast way of determining size without iterating through
// them, and we also have no way of knowing how many oplog entries are in a transaction
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 5af1f1ef48a..b90c1bf4647 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -1432,6 +1432,10 @@ void TransactionParticipant::Participant::addTransactionOperation(
p().transactionOperationBytes += operation.getPreImage().objsize();
++p().numberOfPrePostImagesToWrite;
}
+ if (!operation.getPostImage().isEmpty()) {
+ p().transactionOperationBytes += operation.getPostImage().objsize();
+ ++p().numberOfPrePostImagesToWrite;
+ }
auto transactionSizeLimitBytes = gTransactionSizeLimitBytes.load();
uassert(ErrorCodes::TransactionTooLarge,
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index e3be7aa0d03..76bb35f37f9 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -679,6 +679,10 @@ public:
return p().transactionOperations;
}
+ size_t getNumberOfPrePostImagesToWriteForTest() const {
+ return p().numberOfPrePostImagesToWrite;
+ }
+
const Locker* getTxnResourceStashLockerForTest() const {
invariant(o().txnResourceStash);
return o().txnResourceStash->locker();