diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2021-10-26 04:04:34 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-11-20 04:49:23 +0000 |
commit | e3b5c6069197d478929449eec87f8bfcc58bc7bd (patch) | |
tree | 214e9c5d8f6ea7f431f0eb537f427e0e4040391c /src/mongo | |
parent | 95ba764c2c7a787e117536fe5632fe484f9178c8 (diff) | |
download | mongo-e3b5c6069197d478929449eec87f8bfcc58bc7bd.tar.gz |
SERVER-60540 Add retryability support for internal transactions for findAndModify
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/catalog/collection_impl.cpp | 130 | ||||
-rw-r--r-- | src/mongo/db/commands/find_and_modify.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 198 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 946 | ||||
-rw-r--r-- | src/mongo/db/ops/write_ops.idl | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 40 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 34 | ||||
-rw-r--r-- | src/mongo/db/repl/transaction_oplog_application.cpp | 6 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 4 |
11 files changed, 958 insertions, 411 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index fba19caaced..1fe478e17d5 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -2473,6 +2473,7 @@ if wiredtiger: '$BUILD_DIR/mongo/db/catalog/catalog_test_fixture', '$BUILD_DIR/mongo/db/catalog/import_collection_oplog_entry', '$BUILD_DIR/mongo/db/catalog/index_build_entry_idl', + '$BUILD_DIR/mongo/db/catalog/local_oplog_info', '$BUILD_DIR/mongo/db/mongohasher', '$BUILD_DIR/mongo/db/query/common_query_enums_and_helpers', '$BUILD_DIR/mongo/db/query/query_test_service_context', diff --git a/src/mongo/db/catalog/collection_impl.cpp b/src/mongo/db/catalog/collection_impl.cpp index 7ba1084c750..8d6afd948ab 100644 --- a/src/mongo/db/catalog/collection_impl.cpp +++ b/src/mongo/db/catalog/collection_impl.cpp @@ -260,32 +260,44 @@ Status validateChangeStreamPreAndPostImagesOptionIsPermitted(const NamespaceStri return Status::OK(); } +/** + * Returns true if we are running retryable write or retryable internal multi-document transaction. + */ bool isRetryableWrite(OperationContext* opCtx) { + if (!opCtx->writesAreReplicated() || !opCtx->isRetryableWrite()) { + return false; + } auto txnParticipant = TransactionParticipant::get(opCtx); - const bool inMultiDocumentTransaction = txnParticipant && txnParticipant.transactionIsOpen(); - return !inMultiDocumentTransaction && opCtx->writesAreReplicated() && opCtx->getTxnNumber(); + return txnParticipant && + (!opCtx->inMultiDocumentTransaction() || txnParticipant.transactionIsOpen()); +} + +bool shouldStoreImageInSideCollection(OperationContext* opCtx) { + // Check if we're in a retryable write that should save the image to `config.image_collection`. + // This is the only time `storeFindAndModifyImagesInSideCollection` may be queried for this + // transaction. + return isRetryableWrite(opCtx) && + repl::feature_flags::gFeatureFlagRetryableFindAndModify.isEnabledAndIgnoreFCV() && + repl::gStoreFindAndModifyImagesInSideCollection.load(); } std::vector<OplogSlot> reserveOplogSlotsForRetryableFindAndModify(OperationContext* opCtx, const int numSlots) { - if (isRetryableWrite(opCtx)) { - // Check if we're in a retryable write that should save the image to - // `config.image_collection`. This is the only time - // `storeFindAndModifyImagesInSideCollection` may be queried for this transaction. - const bool storeImageInSideCollection = - repl::feature_flags::gFeatureFlagRetryableFindAndModify.isEnabledAndIgnoreFCV() && - repl::gStoreFindAndModifyImagesInSideCollection.load(); - if (storeImageInSideCollection) { - // We reserve oplog slots here, expecting the slot with the greatest timestmap (say TS) - // to be used as the oplog timestamp. Tenant migrations and resharding will forge no-op - // image oplog entries and set the timestamp for these synthetic entries to be TS - 1. - auto oplogInfo = LocalOplogInfo::get(opCtx); - auto slots = oplogInfo->getNextOpTimes(opCtx, numSlots); - uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(slots.back().getTimestamp())); - return slots; - } + invariant(isRetryableWrite(opCtx)); + + // For retryable findAndModify running in a multi-document transaction, we will reserve the + // oplog entries when the transaction prepares or commits without prepare. + if (opCtx->inMultiDocumentTransaction()) { + return {}; } - return {}; + + // We reserve oplog slots here, expecting the slot with the greatest timestmap (say TS) to be + // used as the oplog timestamp. Tenant migrations and resharding will forge no-op image oplog + // entries and set the timestamp for these synthetic entries to be TS - 1. + auto oplogInfo = LocalOplogInfo::get(opCtx); + auto slots = oplogInfo->getNextOpTimes(opCtx, numSlots); + uassertStatusOK(opCtx->recoveryUnit()->setTimestamp(slots.back().getTimestamp())); + return slots; } @@ -1168,14 +1180,16 @@ void CollectionImpl::deleteDocument(OperationContext* opCtx, } std::vector<OplogSlot> oplogSlots; - if (storeDeletedDoc == Collection::StoreDeletedDoc::On && !getRecordPreImages()) { - oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, 2); - } auto retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kNone; - if (storeDeletedDoc == Collection::StoreDeletedDoc::On && isRetryableWrite(opCtx)) { + if (storeDeletedDoc == Collection::StoreDeletedDoc::On && !getRecordPreImages() && + isRetryableWrite(opCtx)) { + const bool storeImageInSideCollection = shouldStoreImageInSideCollection(opCtx); retryableFindAndModifyLocation = - (oplogSlots.empty() ? RetryableFindAndModifyLocation::kOplog - : RetryableFindAndModifyLocation::kSideCollection); + (storeImageInSideCollection ? RetryableFindAndModifyLocation::kSideCollection + : RetryableFindAndModifyLocation::kOplog); + if (storeImageInSideCollection) { + oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, 2); + } } OplogDeleteEntryArgs deleteArgs{nullptr /* deletedDoc */, fromMigrate, @@ -1284,23 +1298,24 @@ RecordId CollectionImpl::updateDocument(OperationContext* opCtx, const bool setNeedsRetryImageOplogField = args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None; if (args->oplogSlots.empty() && setNeedsRetryImageOplogField) { - // If the update is part of a retryable write and we expect to be storing the pre- or post- - // image in a side collection, then we must reserve oplog slots in advance. We expect to - // use the reserved oplog slots as follows, where TS is the greatest timestamp of - // 'oplogSlots': - // TS - 2: If 'getRecordPreImages()' is true, we reserve an extra oplog slot in case we must - // account for storing a pre-image in the oplog and an eventual synthetic no-op - // image oplog used by tenant migrations/resharding. - // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set - // the entry timestamps to TS - 1. - // TS: The timestamp given to the update oplog entry. - const auto numSlotsToReserve = getRecordPreImages() ? 3 : 2; - const auto oplogSlots = - reserveOplogSlotsForRetryableFindAndModify(opCtx, numSlotsToReserve); - args->oplogSlots = oplogSlots; + const bool storeImageInSideCollection = shouldStoreImageInSideCollection(opCtx); onUpdateArgs.retryableFindAndModifyLocation = - (oplogSlots.empty() ? RetryableFindAndModifyLocation::kOplog - : RetryableFindAndModifyLocation::kSideCollection); + (storeImageInSideCollection ? RetryableFindAndModifyLocation::kSideCollection + : RetryableFindAndModifyLocation::kOplog); + if (storeImageInSideCollection) { + // If the update is part of a retryable write and we expect to be storing the pre- or + // post-image in a side collection, then we must reserve oplog slots in advance. We + // expect to use the reserved oplog slots as follows, where TS is the greatest + // timestamp of 'oplogSlots': + // TS - 2: If 'getRecordPreImages()' is true, we reserve an extra oplog slot in case we + // must account for storing a pre-image in the oplog and an eventual synthetic + // no-op image oplog used by tenant migrations/resharding. + // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set + // the entry timestamps to TS - 1. + // TS: The timestamp given to the update oplog entry. + const auto numSlotsToReserve = getRecordPreImages() ? 3 : 2; + args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, numSlotsToReserve); + } } else { // Retryable findAndModify commands should not reserve oplog slots before entering this // function since tenant migrations and resharding rely on always being able to set @@ -1365,23 +1380,24 @@ StatusWith<RecordData> CollectionImpl::updateDocumentWithDamages( const bool setNeedsRetryImageOplogField = args->storeDocOption != CollectionUpdateArgs::StoreDocOption::None; if (args->oplogSlots.empty() && setNeedsRetryImageOplogField) { - // If the update is part of a retryable write and we expect to be storing the pre- or post- - // image in a side collection, then we must reserve oplog slots in advance. We expect to - // use the reserved oplog slots as follows, where TS is the greatest timestamp of - // 'oplogSlots': - // TS - 2: If 'getRecordPreImages()' is true, we reserve an extra oplog slot in case we must - // account for storing a pre-image in the oplog and an eventual synthetic no-op - // image oplog used by tenant migrations/resharding. - // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set - // the entry timestamps to TS - 1. - // TS: The timestamp given to the update oplog entry. - const auto numSlotsToReserve = getRecordPreImages() ? 3 : 2; - const auto oplogSlots = - reserveOplogSlotsForRetryableFindAndModify(opCtx, numSlotsToReserve); - args->oplogSlots = oplogSlots; + const bool storeImageInSideCollection = shouldStoreImageInSideCollection(opCtx); onUpdateArgs.retryableFindAndModifyLocation = - (oplogSlots.empty() ? RetryableFindAndModifyLocation::kOplog - : RetryableFindAndModifyLocation::kSideCollection); + (storeImageInSideCollection ? RetryableFindAndModifyLocation::kSideCollection + : RetryableFindAndModifyLocation::kOplog); + if (storeImageInSideCollection) { + // If the update is part of a retryable write and we expect to be storing the pre- or + // post-image in a side collection, then we must reserve oplog slots in advance. We + // expect to use the reserved oplog slots as follows, where TS is the greatest + // timestamp of 'oplogSlots': + // TS - 2: If 'getRecordPreImages()' is true, we reserve an extra oplog slot in case we + // must account for storing a pre-image in the oplog and an eventual synthetic + // no-op image oplog used by tenant migrations/resharding. + // TS - 1: Tenant migrations and resharding will forge no-op image oplog entries and set + // the entry timestamps to TS - 1. + // TS: The timestamp given to the update oplog entry. + const auto numSlotsToReserve = getRecordPreImages() ? 3 : 2; + args->oplogSlots = reserveOplogSlotsForRetryableFindAndModify(opCtx, numSlotsToReserve); + } } else { // Retryable findAndModify commands should not reserve oplog slots before entering this // function since tenant migrations and resharding rely on always being able to set diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index 2b835de62ee..8698fcf9f75 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -639,7 +639,7 @@ write_ops::FindAndModifyCommandReply CmdFindAndModify::Invocation::typedRun( !(inTransaction && replCoord->isOplogDisabledFor(opCtx, nsString))); - const auto stmtId = 0; + const auto stmtId = req.getStmtId().value_or(0); if (opCtx->isRetryableWrite()) { const auto txnParticipant = TransactionParticipant::get(opCtx); if (auto entry = txnParticipant.checkStatementExecuted(opCtx, stmtId)) { diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index bb7b7508f0d..e92a2582bc6 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -169,6 +169,12 @@ struct OpTimeBundle { Date_t wallClockTime; }; +struct ImageBundle { + repl::RetryImageEnum imageKind; + BSONObj imageDoc; + Timestamp timestamp; +}; + /** * Write oplog entry(ies) for the update operation. */ @@ -682,14 +688,29 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg args.nss, args.uuid, args.updateArgs->update, args.updateArgs->criteria); if (inRetryableInternalTransaction) { operation.setInitializedStatementIds(args.updateArgs->stmtIds); - } - operation.setDestinedRecipient( - shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc)); - - if (args.updateArgs->preImageRecordingEnabledForCollection) { + if (args.updateArgs->storeDocOption == CollectionUpdateArgs::StoreDocOption::PreImage) { + invariant(args.updateArgs->preImageDoc); + operation.setPreImage(args.updateArgs->preImageDoc->getOwned()); + if (args.retryableFindAndModifyLocation == + RetryableFindAndModifyLocation::kSideCollection) { + operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage); + } + } + if (args.updateArgs->storeDocOption == + CollectionUpdateArgs::StoreDocOption::PostImage) { + invariant(!args.updateArgs->updatedDoc.isEmpty()); + operation.setPostImage(args.updateArgs->updatedDoc.getOwned()); + if (args.retryableFindAndModifyLocation == + RetryableFindAndModifyLocation::kSideCollection) { + operation.setNeedsRetryImage(repl::RetryImageEnum::kPostImage); + } + } + } else if (args.updateArgs->preImageRecordingEnabledForCollection) { invariant(args.updateArgs->preImageDoc); operation.setPreImage(args.updateArgs->preImageDoc->getOwned()); } + operation.setDestinedRecipient( + shardingWriteRouter.getReshardingDestinedRecipient(args.updateArgs->updatedDoc)); txnParticipant.addTransactionOperation(opCtx, operation); } else { @@ -818,19 +839,29 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, OpTimeBundle opTime; if (inMultiDocumentTransaction) { - tassert(5868700, - "Attempted a retryable write within a multi-document transaction", - args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kNone); - const bool inRetryableInternalTransaction = isInternalSessionForRetryableWrite(*opCtx->getLogicalSessionId()); + tassert(5868700, + "Attempted a retryable write within a non-retryable multi-document transaction", + inRetryableInternalTransaction || + args.retryableFindAndModifyLocation == RetryableFindAndModifyLocation::kNone); + auto operation = MutableOplogEntry::makeDeleteOperation(nss, uuid, documentKey.getShardKeyAndId()); if (inRetryableInternalTransaction) { operation.setInitializedStatementIds({stmtId}); + if (args.retryableFindAndModifyLocation != RetryableFindAndModifyLocation::kNone) { + tassert(6054000, + "Deleted document must be present for pre-image recording", + args.deletedDoc); + operation.setPreImage(args.deletedDoc->getOwned()); + if (args.retryableFindAndModifyLocation == + RetryableFindAndModifyLocation::kSideCollection) { + operation.setNeedsRetryImage(repl::RetryImageEnum::kPreImage); + } + } } - if (args.preImageRecordingEnabledForCollection) { tassert(5868701, "Deleted document must be present for pre-image recording", @@ -1253,14 +1284,40 @@ namespace { // Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array // field. Appends as many operations as possible to the array (and their corresponding statement // ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the -// maximum number of transaction statements allowed in one entry. +// maximum number of transaction statements allowed in one entry. If any of the statements has +// a pre-image or post-image that needs to be stored in the image collection, stores it to +// 'imageToWrite'. // // Returns an iterator to the first statement that wasn't packed into the applyOps object. std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( BSONObjBuilder* applyOpsBuilder, std::vector<StmtId>* stmtIdsWritten, + boost::optional<std::pair<repl::RetryImageEnum, BSONObj>>* imageToWrite, std::vector<repl::ReplOperation>::iterator stmtBegin, std::vector<repl::ReplOperation>::iterator stmtEnd) { + auto setImageToWrite = [&](const repl::ReplOperation& stmt) { + uassert(6054001, + str::stream() << NamespaceString::kConfigImagesNamespace + << " can only store the pre or post image of one " + "findAndModify operation for each " + "transaction", + !(*imageToWrite)); + switch (*stmt.getNeedsRetryImage()) { + case repl::RetryImageEnum::kPreImage: { + invariant(!stmt.getPreImage().isEmpty()); + *imageToWrite = std::make_pair(repl::RetryImageEnum::kPreImage, stmt.getPreImage()); + break; + } + case repl::RetryImageEnum::kPostImage: { + invariant(!stmt.getPostImage().isEmpty()); + *imageToWrite = + std::make_pair(repl::RetryImageEnum::kPostImage, stmt.getPostImage()); + break; + } + default: + MONGO_UNREACHABLE; + } + }; std::vector<repl::ReplOperation>::iterator stmtIter; BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd)); @@ -1280,6 +1337,9 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( opsArray.append(stmt.toBSON()); const auto stmtIds = stmt.getStatementIds(); stmtIdsWritten->insert(stmtIdsWritten->end(), stmtIds.begin(), stmtIds.end()); + if (stmt.getNeedsRetryImage()) { + setImageToWrite(stmt); + } } try { // BSONArrayBuilder will throw a BSONObjectTooLarge exception if we exceeded the max BSON @@ -1374,11 +1434,13 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, // skipping over some reserved slots. // // The number of oplog entries written is returned. -int logOplogEntriesForTransaction(OperationContext* opCtx, - std::vector<repl::ReplOperation>* stmts, - const std::vector<OplogSlot>& oplogSlots, - size_t numberOfPrePostImagesToWrite, - bool prepare) { +int logOplogEntriesForTransaction( + OperationContext* opCtx, + std::vector<repl::ReplOperation>* stmts, + const std::vector<OplogSlot>& oplogSlots, + boost::optional<ImageBundle>* prePostImageToWriteToImageCollection, + size_t numberOfPrePostImagesToWrite, + bool prepare) { invariant(!stmts->empty()); invariant(stmts->size() <= oplogSlots.size()); @@ -1398,28 +1460,44 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); auto currOplogSlot = oplogSlots.begin(); - // We never want to store pre-images when we're migrating oplog entries from another - // replica set. + // We never want to store pre-images or post-images when we're migrating oplog entries from + // another replica set. const auto& migrationRecipientInfo = repl::tenantMigrationRecipientInfo(opCtx); - if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) { - for (auto& statement : *stmts) { - if (statement.getPreImage().isEmpty()) { - continue; - } + auto logPrePostImageNoopEntry = [&](const repl::ReplOperation& statement, + const BSONObj& imageDoc) { + auto slot = *currOplogSlot; + ++currOplogSlot; - auto slot = *currOplogSlot; - ++currOplogSlot; + MutableOplogEntry imageEntry; + imageEntry.setOpType(repl::OpTypeEnum::kNoop); + imageEntry.setObject(imageDoc); + imageEntry.setNss(statement.getNss()); + imageEntry.setUuid(statement.getUuid()); + imageEntry.setOpTime(slot); - MutableOplogEntry preImageEntry; - preImageEntry.setOpType(repl::OpTypeEnum::kNoop); - preImageEntry.setObject(statement.getPreImage()); - preImageEntry.setNss(statement.getNss()); - preImageEntry.setUuid(statement.getUuid()); - preImageEntry.setOpTime(slot); + return logOperation(opCtx, &imageEntry); + }; - auto opTime = logOperation(opCtx, &preImageEntry); - statement.setPreImageOpTime(opTime); + if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) { + for (auto& statement : *stmts) { + if (!statement.getPreImage().isEmpty() && + statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage) { + // Note that 'needsRetryImage' stores the image kind that needs to stored in the + // image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the + // pre-image will be written to the image collection (after all the applyOps oplog + // entries are written). + auto opTime = logPrePostImageNoopEntry(statement, statement.getPreImage()); + statement.setPreImageOpTime(opTime); + } + if (!statement.getPostImage().isEmpty() && + statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) { + // Likewise, when 'needsRetryImage' is equal to kPostImage, the post-image will be + // written to the image collection (after all the applyOps oplog entries are + // written). + auto opTime = logPrePostImageNoopEntry(statement, statement.getPostImage()); + statement.setPostImageOpTime(opTime); + } } } @@ -1432,10 +1510,11 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, // termination condition. auto stmtsIter = stmts->begin(); while (stmtsIter != stmts->end()) { - BSONObjBuilder applyOpsBuilder; + boost::optional<std::pair<repl::RetryImageEnum, BSONObj>> imageToWrite; + auto nextStmt = packTransactionStatementsForApplyOps( - &applyOpsBuilder, &stmtIdsWritten, stmtsIter, stmts->end()); + &applyOpsBuilder, &stmtIdsWritten, &imageToWrite, stmtsIter, stmts->end()); // If we packed the last op, then the next oplog entry we log should be the implicit // commit or implicit prepare, i.e. we omit the 'partialTxn' field. @@ -1445,9 +1524,23 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, auto implicitCommit = lastOp && !prepare; auto implicitPrepare = lastOp && prepare; auto isPartialTxn = !lastOp; - if (isPartialTxn) { - // Partial transactions create multiple oplog entries in the same WriteUnitOfWork. - // Because of this, partial transactions will set multiple timestamps, violating the + + if (imageToWrite) { + // Reserve an oplog slot for potential forged noop oplog entry for the pre-image or + // post-image. + uassert(6054002, + str::stream() << NamespaceString::kConfigImagesNamespace + << " can only store the pre or post image of one " + "findAndModify operation for each " + "transaction", + !(*prePostImageToWriteToImageCollection)); + ++currOplogSlot; + } + + if (isPartialTxn || (imageToWrite && !prepare)) { + // Partial transactions and unprepared transactions with pre or post image stored in the + // image collection create/reserve multiple oplog entries in the same WriteUnitOfWork. + // Because of this, such transactions will set multiple timestamps, violating the // multi timestamp constraint. It's safe to ignore the multi timestamp constraints here // as additional rollback logic is in place for this case. opCtx->recoveryUnit()->ignoreAllMultiTimestampConstraints(); @@ -1505,6 +1598,14 @@ int logOplogEntriesForTransaction(OperationContext* opCtx, hangAfterLoggingApplyOpsForTransaction.pauseWhileSet(); + if (imageToWrite) { + invariant(!(*prePostImageToWriteToImageCollection)); + *prePostImageToWriteToImageCollection = + ImageBundle{imageToWrite->first, + imageToWrite->second, + prevWriteOpTime.writeOpTime.getTimestamp()}; + } + // Advance the iterator to the beginning of the remaining unpacked statements. stmtsIter = nextStmt; numEntriesWritten++; @@ -1595,8 +1696,17 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, } // Log in-progress entries for the transaction along with the implicit commit. + boost::optional<ImageBundle> imageToWrite; int numOplogEntries = logOplogEntriesForTransaction( - opCtx, statements, oplogSlots, numberOfPrePostImagesToWrite, false); + opCtx, statements, oplogSlots, &imageToWrite, numberOfPrePostImagesToWrite, false); + if (imageToWrite) { + writeToImageCollection(opCtx, + *opCtx->getLogicalSessionId(), + imageToWrite->timestamp, + imageToWrite->imageKind, + imageToWrite->imageDoc); + } + commitOpTime = oplogSlots[numOplogEntries - 1]; invariant(!commitOpTime.isNull()); shardObserveTransactionPrepareOrUnpreparedCommit(opCtx, *statements, commitOpTime); @@ -1659,12 +1769,20 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, // will waste the extra slots. The implicit prepare oplog entry will still use // the last reserved slot, because the transaction participant has already used // that as the prepare time. + boost::optional<ImageBundle> imageToWrite; logOplogEntriesForTransaction(opCtx, statements, reservedSlots, + &imageToWrite, numberOfPrePostImagesToWrite, true /* prepare */); - + if (imageToWrite) { + writeToImageCollection(opCtx, + *opCtx->getLogicalSessionId(), + imageToWrite->timestamp, + imageToWrite->imageKind, + imageToWrite->imageDoc); + } } else { // Log an empty 'prepare' oplog entry. // We need to have at least one reserved slot. diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 740f24808a5..fbfd560f674 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -46,6 +46,7 @@ #include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/read_write_concern_defaults.h" #include "mongo/db/read_write_concern_defaults_cache_lookup_mock.h" +#include "mongo/db/repl/apply_ops_command_info.h" #include "mongo/db/repl/image_collection_entry_gen.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" @@ -71,6 +72,72 @@ namespace { using repl::OplogEntry; using unittest::assertGet; +namespace { + +OplogEntry getInnerEntryFromApplyOpsOplogEntry(const OplogEntry& oplogEntry) { + std::vector<repl::OplogEntry> innerEntries; + ASSERT(oplogEntry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps); + repl::ApplyOps::extractOperationsTo(oplogEntry, oplogEntry.getEntry().toBSON(), &innerEntries); + ASSERT_EQ(innerEntries.size(), 1u); + return innerEntries[0]; +} + +void beginRetryableWriteWithTxnNumber( + OperationContext* opCtx, + TxnNumber txnNumber, + std::unique_ptr<MongoDOperationContextSession>& contextSession) { + opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + opCtx->setTxnNumber(txnNumber); + + contextSession = std::make_unique<MongoDOperationContextSession>(opCtx); + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue(opCtx, + {*opCtx->getTxnNumber()}, + boost::none /* autocommit */, + boost::none /* startTransaction */); +}; + +void beginNonRetryableTransactionWithTxnNumber( + OperationContext* opCtx, + TxnNumber txnNumber, + std::unique_ptr<MongoDOperationContextSession>& contextSession) { + opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + opCtx->setTxnNumber(txnNumber); + opCtx->setInMultiDocumentTransaction(); + + contextSession = std::make_unique<MongoDOperationContextSession>(opCtx); + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue( + opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); +}; + +void beginRetryableInternalTransactionWithTxnNumber( + OperationContext* opCtx, + TxnNumber txnNumber, + std::unique_ptr<MongoDOperationContextSession>& contextSession) { + RAIIServerParameterControllerForTest controller{"featureFlagInternalTransactions", true}; + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + + opCtx->setLogicalSessionId(makeLogicalSessionIdWithTxnNumberAndUUIDForTest()); + opCtx->setTxnNumber(txnNumber); + opCtx->setInMultiDocumentTransaction(); + + contextSession = std::make_unique<MongoDOperationContextSession>(opCtx); + auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant.beginOrContinue( + opCtx, {*opCtx->getTxnNumber()}, false /* autocommit */, true /* startTransaction */); +}; + +template <typename OpObserverType> +void commitUnpreparedTransaction(OperationContext* opCtx, OpObserverType& opObserver) { + auto txnParticipant = TransactionParticipant::get(opCtx); + auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx); + opObserver.onUnpreparedTransactionCommit( + opCtx, &txnOps, txnParticipant.getNumberOfPrePostImagesToWriteForTest()); +} + +} // namespace + class OpObserverTest : public ServiceContextMongoDTest { public: void setUp() override { @@ -95,6 +162,10 @@ public: ReadWriteConcernDefaults::create(getServiceContext(), _lookupMock.getFetchDefaultsFn()); } + void tearDown() override { + serverGlobalParams.clusterRole = ClusterRole::None; + } + void reset(OperationContext* opCtx, NamespaceString nss) const { writeConflictRetry(opCtx, "deleteAll", nss.ns(), [&] { opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kNoTimestamp); @@ -163,6 +234,11 @@ protected: return getNOplogEntries(opCtx, 1).back(); } + BSONObj getInnerEntryFromSingleApplyOpsOplogEntry(OperationContext* opCtx) { + auto applyOpsOplogEntry = assertGet(OplogEntry::parse(getNOplogEntries(opCtx, 1).back())); + return getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry).getEntry().toBSON(); + } + bool didWriteImageEntryToSideCollection(OperationContext* opCtx, const LogicalSessionId& sessionId) { AutoGetCollection sideCollection( @@ -739,17 +815,9 @@ public: void setUp() override { OpObserverTest::setUp(); _opCtx = cc().makeOperationContext(); - _opObserver.emplace(); - MongoDSessionCatalog::onStepUp(opCtx()); _times.emplace(opCtx()); - - opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); - opCtx()->setTxnNumber(txnNum()); - opCtx()->setInMultiDocumentTransaction(); - _sessionCheckout = std::make_unique<MongoDOperationContextSession>(opCtx()); - _txnParticipant.emplace(TransactionParticipant::get(opCtx())); } void tearDown() override { @@ -760,6 +828,20 @@ public: OpObserverTest::tearDown(); } + void setUpRetryableWrite() { + beginRetryableWriteWithTxnNumber(opCtx(), txnNum(), _sessionCheckout); + _txnParticipant.emplace(TransactionParticipant::get(opCtx())); + } + + void setUpNonRetryableTransaction() { + beginNonRetryableTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout); + _txnParticipant.emplace(TransactionParticipant::get(opCtx())); + } + + void setUpRetryableInternalTransaction() { + beginRetryableInternalTransactionWithTxnNumber(opCtx(), txnNum(), _sessionCheckout); + _txnParticipant.emplace(TransactionParticipant::get(opCtx())); + } protected: Session* session() { @@ -806,10 +888,7 @@ class OpObserverTransactionTest : public OpObserverTxnParticipantTest { public: void setUp() override { OpObserverTxnParticipantTest::setUp(); - txnParticipant().beginOrContinue(opCtx(), - {*opCtx()->getTxnNumber()}, - false /* autocommit */, - true /* startTransaction */); + OpObserverTxnParticipantTest::setUpNonRetryableTransaction(); } protected: @@ -1498,109 +1577,214 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) { */ class OpObserverRetryableFindAndModifyTest : public OpObserverTxnParticipantTest { public: + void tearDown() override { + OpObserverTxnParticipantTest::tearDown(); + } + +protected: + void testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage() { + NamespaceString nss = {"test", "coll"}; + const auto uuid = CollectionUUID::gen(); + + CollectionUpdateArgs updateArgs; + updateArgs.stmtIds = {0}; + updateArgs.updatedDoc = BSON("_id" << 0 << "data" + << "x"); + updateArgs.update = BSON("$set" << BSON("data" + << "x")); + updateArgs.criteria = BSON("_id" << 0); + updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage; + OplogUpdateEntryArgs update(&updateArgs, nss, uuid); + update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; + + WriteUnitOfWork wunit(opCtx()); + AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); + opObserver().onUpdate(opCtx(), update); + commit(); + + // Asserts that only a single oplog entry was created. In essence, we did not create any + // no-op image entries in the oplog. + const auto oplogEntry = assertGetSingleOplogEntry(); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); + ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); + ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), + "postImage"_sd); + } + + void testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage() { + NamespaceString nss = {"test", "coll"}; + const auto uuid = CollectionUUID::gen(); + + CollectionUpdateArgs updateArgs; + updateArgs.stmtIds = {0}; + updateArgs.preImageDoc = BSON("_id" << 0 << "data" + << "y"); + updateArgs.update = BSON("$set" << BSON("data" + << "x")); + updateArgs.criteria = BSON("_id" << 0); + updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage; + OplogUpdateEntryArgs update(&updateArgs, nss, uuid); + update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; + + WriteUnitOfWork wunit(opCtx()); + AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); + opObserver().onUpdate(opCtx(), update); + commit(); + + // Asserts that only a single oplog entry was created. In essence, we did not create any + // no-op image entries in the oplog. + const auto oplogEntry = assertGetSingleOplogEntry(); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); + ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); + ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), + "preImage"_sd); + } + + void testRetryableFindAndModifyDeleteHasNeedsRetryImage() { + NamespaceString nss = {"test", "coll"}; + const auto uuid = CollectionUUID::gen(); + + WriteUnitOfWork wunit(opCtx()); + AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); + const auto deletedDoc = BSON("_id" << 0 << "data" + << "x"); + opObserver().aboutToDelete(opCtx(), nss, uuid, deletedDoc); + OplogDeleteEntryArgs args; + args.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; + args.deletedDoc = &deletedDoc; + opObserver().onDelete(opCtx(), nss, uuid, 0, args); + commit(); + + // Asserts that only a single oplog entry was created. In essence, we did not create any + // no-op image entries in the oplog. + const auto oplogEntry = assertGetSingleOplogEntry(); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); + ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); + ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); + ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), + "preImage"_sd); + } + + virtual void commit() = 0; + + virtual BSONObj assertGetSingleOplogEntry() = 0; +}; + +class OpObserverRetryableFindAndModifyOutsideTransactionTest + : public OpObserverRetryableFindAndModifyTest { +public: void setUp() override { OpObserverTxnParticipantTest::setUp(); - txnParticipant().beginOrContinue( - opCtx(), {txnNum()}, boost::none /* autocommit */, boost::none /* startTransaction */); + OpObserverTxnParticipantTest::setUpRetryableWrite(); } - void tearDown() override { - OpObserverTxnParticipantTest::tearDown(); +protected: + void commit() final{}; + + BSONObj assertGetSingleOplogEntry() final { + return getSingleOplogEntry(opCtx()); } }; -TEST_F(OpObserverRetryableFindAndModifyTest, +TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest, RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) { - NamespaceString nss = {"test", "coll"}; - const auto uuid = CollectionUUID::gen(); + testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage(); +} - CollectionUpdateArgs updateArgs; - updateArgs.stmtIds = {0}; - updateArgs.updatedDoc = BSON("_id" << 0 << "data" - << "x"); - updateArgs.update = BSON("$set" << BSON("data" - << "x")); - updateArgs.criteria = BSON("_id" << 0); - updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PostImage; - OplogUpdateEntryArgs update(&updateArgs, nss, uuid); - update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - - WriteUnitOfWork wunit(opCtx()); - AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); - opObserver().onUpdate(opCtx(), update); - // Asserts that only a single oplog entry was created. In essence, we did not create any - // no-op image entries in the oplog. - const auto oplogEntry = getSingleOplogEntry(opCtx()); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); - ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); - ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), - "postImage"_sd); -} - -TEST_F(OpObserverRetryableFindAndModifyTest, +TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest, RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) { - NamespaceString nss = {"test", "coll"}; - const auto uuid = CollectionUUID::gen(); + testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage(); +} - CollectionUpdateArgs updateArgs; - updateArgs.stmtIds = {0}; - updateArgs.preImageDoc = BSON("_id" << 0 << "data" - << "y"); - updateArgs.update = BSON("$set" << BSON("data" - << "x")); - updateArgs.criteria = BSON("_id" << 0); - updateArgs.storeDocOption = CollectionUpdateArgs::StoreDocOption::PreImage; - OplogUpdateEntryArgs update(&updateArgs, nss, uuid); - update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - - WriteUnitOfWork wunit(opCtx()); - AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); - opObserver().onUpdate(opCtx(), update); - // Asserts that only a single oplog entry was created. In essence, we did not create any - // no-op image entries in the oplog. - const auto oplogEntry = getSingleOplogEntry(opCtx()); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); - ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); - ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), - "preImage"_sd); -} - -TEST_F(OpObserverRetryableFindAndModifyTest, RetryableFindAndModifyDeleteHasNeedsRetryImage) { - NamespaceString nss = {"test", "coll"}; - const auto uuid = CollectionUUID::gen(); +TEST_F(OpObserverRetryableFindAndModifyOutsideTransactionTest, + RetryableFindAndModifyDeleteHasNeedsRetryImage) { + testRetryableFindAndModifyDeleteHasNeedsRetryImage(); +} - WriteUnitOfWork wunit(opCtx()); - AutoGetDb autoDb(opCtx(), nss.db(), MODE_X); - const auto deletedDoc = BSON("_id" << 0 << "data" - << "x"); - opObserver().aboutToDelete(opCtx(), nss, uuid, deletedDoc); - OplogDeleteEntryArgs args; - args.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - args.deletedDoc = &deletedDoc; - opObserver().onDelete(opCtx(), nss, uuid, 0, args); - // Asserts that only a single oplog entry was created. In essence, we did not create any - // no-op image entries in the oplog. - const auto oplogEntry = getSingleOplogEntry(opCtx()); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPreImageOpTimeFieldName)); - ASSERT_FALSE(oplogEntry.hasField(repl::OplogEntryBase::kPostImageOpTimeFieldName)); - ASSERT_TRUE(oplogEntry.hasField(repl::OplogEntryBase::kNeedsRetryImageFieldName)); - ASSERT_EQUALS(oplogEntry.getStringField(repl::OplogEntryBase::kNeedsRetryImageFieldName), - "preImage"_sd); -} - -OplogEntry findByTimestamp(const std::vector<BSONObj>& oplogs, Timestamp ts) { +class OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest + : public OpObserverRetryableFindAndModifyTest { +public: + void setUp() override { + OpObserverTxnParticipantTest::setUp(); + OpObserverTxnParticipantTest::setUpRetryableInternalTransaction(); + } + +protected: + void commit() final { + commitUnpreparedTransaction<OpObserverImpl>(opCtx(), opObserver()); + }; + + BSONObj assertGetSingleOplogEntry() final { + return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx()); + } +}; + +TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest, + RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) { + testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage(); +} + +TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest, + RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) { + testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage(); +} + +TEST_F(OpObserverRetryableFindAndModifyInsideUnpreparedRetryableInternalTransactionTest, + RetryableFindAndModifyDeleteHasNeedsRetryImage) { + testRetryableFindAndModifyDeleteHasNeedsRetryImage(); +} + +class OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest + : public OpObserverRetryableFindAndModifyTest { +public: + void setUp() override { + serverGlobalParams.clusterRole = ClusterRole::ShardServer; + OpObserverTxnParticipantTest::setUp(); + OpObserverTxnParticipantTest::setUpRetryableInternalTransaction(); + } + +protected: + void commit() final { + const auto prepareSlot = repl::getNextOpTime(opCtx()); + txnParticipant().transitionToPreparedforTest(opCtx(), prepareSlot); + auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx()); + opObserver().onTransactionPrepare( + opCtx(), + {prepareSlot}, + &txnOps, + txnParticipant().getNumberOfPrePostImagesToWriteForTest()); + }; + + BSONObj assertGetSingleOplogEntry() final { + return getInnerEntryFromSingleApplyOpsOplogEntry(opCtx()); + } +}; + +TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest, + RetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage) { + testRetryableFindAndModifyUpdateRequestingPostImageHasNeedsRetryImage(); +} + +TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest, + RetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage) { + testRetryableFindAndModifyUpdateRequestingPreImageHasNeedsRetryImage(); +} + +TEST_F(OpObserverRetryableFindAndModifyInsidePreparedRetryableInternalTransactionTest, + RetryableFindAndModifyDeleteHasNeedsRetryImage) { + testRetryableFindAndModifyDeleteHasNeedsRetryImage(); +} + +boost::optional<OplogEntry> findByTimestamp(const std::vector<BSONObj>& oplogs, Timestamp ts) { for (auto& oplog : oplogs) { const auto& entry = assertGet(OplogEntry::parse(oplog)); if (entry.getTimestamp() == ts) { return entry; } } - - FAIL("Not found."); - // C++/clang isn't smart enough to know FAIL is guaranteed to throw. - MONGO_UNREACHABLE; + return boost::none; } using StoreDocOption = CollectionUpdateArgs::StoreDocOption; @@ -1619,6 +1803,8 @@ const bool kChangeStreamImagesDisabled = false; const auto kNotRetryable = RetryableFindAndModifyLocation::kNone; const auto kRecordInOplog = RetryableFindAndModifyLocation::kOplog; const auto kRecordInSideCollection = RetryableFindAndModifyLocation::kSideCollection; + +const std::vector<bool> kInMultiDocumentTransactionCases{false, true}; } // namespace struct UpdateTestCase { @@ -1662,48 +1848,9 @@ struct UpdateTestCase { } }; -TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { - // Create a registry that only registers the Impl. It can be challenging to call methods on the - // Impl directly. It falls into cases where `ReservedTimes` is expected to be instantiated. Due - // to strong encapsulation, we use the registry that managers the `ReservedTimes` on our behalf. - OpObserverRegistry opObserver; - opObserver.addObserver(std::make_unique<OpObserverImpl>()); - - NamespaceString nss("test", "coll"); - CollectionUUID uuid = CollectionUUID::gen(); - - std::vector<UpdateTestCase> cases = { - // Regular updates. - {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, - {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, - {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 1}, - {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, - {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, - {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}, - // FindAndModify asking for a preImage. - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, - {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, - {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, - {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}, - // FindAndModify asking for a postImage. - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, - {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, - {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, - {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 3}, - {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}}; - - for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) { - const auto& testCase = cases[testIdx]; +class OnUpdateOutputsTest : public OpObserverTest { +protected: + void logTestCase(const UpdateTestCase& testCase) { LOGV2(5739902, "UpdateTestCase", "ImageType"_attr = testCase.getImageTypeStr(), @@ -1712,98 +1859,87 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { "RetryableFindAndModifyLocation"_attr = testCase.getRetryableFindAndModifyLocationStr(), "ExpectedOplogEntries"_attr = testCase.numOutputOplogs); + } - CollectionUpdateArgs updateArgs; - updateArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; - updateArgs.changeStreamPreAndPostImagesEnabledForCollection = + void initializeOplogUpdateEntryArgs(OperationContext* opCtx, + const UpdateTestCase& testCase, + OplogUpdateEntryArgs* update) { + update->updateArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; + update->updateArgs->changeStreamPreAndPostImagesEnabledForCollection = testCase.changeStreamImagesEnabled; - auto opCtxRaii = cc().makeOperationContext(); - OperationContext* opCtx = opCtxRaii.get(); - // Phase 1: Clearing any state and setting up fixtures/the update call. - resetOplogAndTransactions(opCtx); - - OplogUpdateEntryArgs update(&updateArgs, nss, uuid); - boost::optional<MongoDOperationContextSession> contextSession; - boost::optional<TransactionParticipant::Participant> txnParticipant; switch (testCase.retryableOptions) { case kNotRetryable: - updateArgs.stmtIds = {kUninitializedStmtId}; + update->updateArgs->stmtIds = {kUninitializedStmtId}; break; case kRecordInOplog: - update.retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kOplog; - updateArgs.stmtIds = {1}; + update->retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kOplog; + update->updateArgs->stmtIds = {1}; break; case kRecordInSideCollection: - update.retryableFindAndModifyLocation = + update->retryableFindAndModifyLocation = RetryableFindAndModifyLocation::kSideCollection; - updateArgs.stmtIds = {1}; - if (testCase.alwaysRecordPreImages && - testCase.retryableOptions == kRecordInSideCollection) { + update->updateArgs->stmtIds = {1}; + if (testCase.retryableOptions == kRecordInSideCollection) { // 'getNextOpTimes' requires us to be inside a WUOW when reserving oplog slots. WriteUnitOfWork wuow(opCtx); auto reservedSlots = repl::getNextOpTimes(opCtx, 3); - updateArgs.oplogSlots = reservedSlots; + update->updateArgs->oplogSlots = reservedSlots; } break; } - if (testCase.retryableOptions != kNotRetryable) { - opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); - opCtx->setTxnNumber(TxnNumber(testIdx)); - contextSession.emplace(opCtx); - txnParticipant.emplace(TransactionParticipant::get(opCtx)); - txnParticipant->beginOrContinue(opCtx, - {TxnNumber(testIdx)}, - boost::none /* autocommit */, - boost::none /* startTransaction */); - } - - updateArgs.preImageDoc = boost::none; + update->updateArgs->preImageDoc = boost::none; if (testCase.imageType == StoreDocOption::PreImage || testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) { - updateArgs.preImageDoc = BSON("_id" << 0 << "preImage" << true); + update->updateArgs->preImageDoc = BSON("_id" << 0 << "preImage" << true); } - - updateArgs.updatedDoc = BSON("_id" << 0 << "postImage" << true); - updateArgs.update = + update->updateArgs->updatedDoc = BSON("_id" << 0 << "postImage" << true); + update->updateArgs->update = BSON("$set" << BSON("postImage" << true) << "$unset" << BSON("preImage" << 1)); - updateArgs.criteria = BSON("_id" << 0); - updateArgs.storeDocOption = testCase.imageType; - - // Phase 2: Call the code we're testing. - WriteUnitOfWork wuow(opCtx); - AutoGetCollection locks(opCtx, nss, LockMode::MODE_IX); - opObserver.onUpdate(opCtx, update); - wuow.commit(); - - // Phase 3: Analyze the results: - - // This `getNOplogEntries` also asserts that all oplogs are retrieved. - std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); - // Entries are returned in ascending timestamp order. - const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back())); + update->updateArgs->criteria = BSON("_id" << 0); + update->updateArgs->storeDocOption = testCase.imageType; + } + void checkPreImageInOplogIfNeeded(const UpdateTestCase& testCase, + const OplogUpdateEntryArgs& update, + const std::vector<BSONObj>& oplogs, + const OplogEntry& updateOplogEntry) { const bool checkPreImageInOplog = testCase.alwaysRecordPreImages || (testCase.imageType == StoreDocOption::PreImage && testCase.retryableOptions == kRecordInOplog); if (checkPreImageInOplog) { - ASSERT(actualOp.getPreImageOpTime()); - const Timestamp preImageOpTime = actualOp.getPreImageOpTime()->getTimestamp(); + ASSERT(updateOplogEntry.getPreImageOpTime()); + const Timestamp preImageOpTime = updateOplogEntry.getPreImageOpTime()->getTimestamp(); ASSERT_FALSE(preImageOpTime.isNull()); - OplogEntry preImage = findByTimestamp(oplogs, preImageOpTime); + OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime); ASSERT_BSONOBJ_EQ(update.updateArgs->preImageDoc.get(), preImage.getObject()); + } else { + ASSERT_FALSE(updateOplogEntry.getPreImageOpTime()); } + } + void checkPostImageInOplogIfNeeded(const UpdateTestCase& testCase, + const OplogUpdateEntryArgs& update, + const std::vector<BSONObj>& oplogs, + const OplogEntry& updateOplogEntry) { const bool checkPostImageInOplog = testCase.imageType == StoreDocOption::PostImage && testCase.retryableOptions == kRecordInOplog; if (checkPostImageInOplog) { - ASSERT(actualOp.getPostImageOpTime()); - const Timestamp postImageOpTime = actualOp.getPostImageOpTime()->getTimestamp(); + ASSERT(updateOplogEntry.getPostImageOpTime()); + const Timestamp postImageOpTime = updateOplogEntry.getPostImageOpTime()->getTimestamp(); ASSERT_FALSE(postImageOpTime.isNull()); - OplogEntry postImage = findByTimestamp(oplogs, postImageOpTime); + OplogEntry postImage = *findByTimestamp(oplogs, postImageOpTime); ASSERT_BSONOBJ_EQ(update.updateArgs->updatedDoc, postImage.getObject()); + } else { + ASSERT_FALSE(updateOplogEntry.getPostImageOpTime()); } + } + void checkSideCollectionIfNeeded(OperationContext* opCtx, + const UpdateTestCase& testCase, + const OplogUpdateEntryArgs& update, + const std::vector<BSONObj>& oplogs, + const OplogEntry& updateOplogEntry) { bool checkSideCollection = testCase.isFindAndModify() && testCase.retryableOptions == kRecordInSideCollection; if (checkSideCollection && testCase.alwaysRecordPreImages && @@ -1813,32 +1949,180 @@ TEST_F(OpObserverTest, TestFundamentalOnUpdateOutputs) { // in the side collection. checkSideCollection = false; } - if (checkSideCollection) { repl::ImageEntry imageEntry = - getImageEntryFromSideCollection(opCtx, *actualOp.getSessionId()); + getImageEntryFromSideCollection(opCtx, *updateOplogEntry.getSessionId()); const BSONObj& expectedImage = testCase.imageType == StoreDocOption::PreImage ? update.updateArgs->preImageDoc.get() : update.updateArgs->updatedDoc; ASSERT_BSONOBJ_EQ(expectedImage, imageEntry.getImage()); + ASSERT(imageEntry.getImageKind() == updateOplogEntry.getNeedsRetryImage()); if (testCase.imageType == StoreDocOption::PreImage) { ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage); } else { ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPostImage); } + + // If 'updateOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged + // noop oplog entry for the pre/postImage written to the side collection. + const Timestamp forgeNoopTimestamp = updateOplogEntry.getTimestamp() - 1; + ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp)); + } else { + ASSERT_FALSE(updateOplogEntry.getNeedsRetryImage()); + if (updateOplogEntry.getSessionId()) { + ASSERT_FALSE( + didWriteImageEntryToSideCollection(opCtx, *updateOplogEntry.getSessionId())); + } else { + // Session id is missing only for non-retryable option. + ASSERT(testCase.retryableOptions == kNotRetryable); + } } + } + void checkChangeStreamImagesIfNeeded(OperationContext* opCtx, + const UpdateTestCase& testCase, + const OplogUpdateEntryArgs& update, + const OplogEntry& updateOplogEntry) { if (testCase.changeStreamImagesEnabled) { BSONObj container; - ChangeStreamPreImageId preImageId(uuid, actualOp.getOpTime().getTimestamp(), 0); + ChangeStreamPreImageId preImageId( + _uuid, updateOplogEntry.getOpTime().getTimestamp(), 0); ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container); const BSONObj& expectedImage = update.updateArgs->preImageDoc.get(); ASSERT_BSONOBJ_EQ(expectedImage, preImage.getPreImage()); - ASSERT_EQ(actualOp.getWallClockTime(), preImage.getOperationTime()); + ASSERT_EQ(updateOplogEntry.getWallClockTime(), preImage.getOperationTime()); + } + } + + std::vector<UpdateTestCase> _cases = { + // Regular updates. + {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, + {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, + {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 1}, + {kNonFaM, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, + {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, + {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kNonFaM, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}, + // FindAndModify asking for a preImage. + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, + {kFaMPre, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, + {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, + {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kFaMPre, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}, + // FindAndModify asking for a postImage. + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, + {kFaMPost, kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, + {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, + {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 3}, + {kFaMPost, kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}}; + + const NamespaceString _nss{"test", "coll"}; + const CollectionUUID _uuid = CollectionUUID::gen(); +}; + +TEST_F(OnUpdateOutputsTest, TestNonTransactionFundamentalOnUpdateOutputs) { + // Create a registry that only registers the Impl. It can be challenging to call methods on + // the Impl directly. It falls into cases where `ReservedTimes` is expected to be + // instantiated. Due to strong encapsulation, we use the registry that managers the + // `ReservedTimes` on our behalf. + OpObserverRegistry opObserver; + opObserver.addObserver(std::make_unique<OpObserverImpl>()); + + for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) { + const auto& testCase = _cases[testIdx]; + logTestCase(testCase); + + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + + // Phase 1: Clearing any state and setting up fixtures/the update call. + resetOplogAndTransactions(opCtx); + + std::unique_ptr<MongoDOperationContextSession> contextSession; + if (testCase.isRetryable()) { + beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession); } + + // Phase 2: Call the code we're testing. + CollectionUpdateArgs updateArgs; + OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid); + initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs); + + WriteUnitOfWork wuow(opCtx); + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + opObserver.onUpdate(opCtx, updateEntryArgs); + wuow.commit(); + + // Phase 3: Analyze the results: + // This `getNOplogEntries` also asserts that all oplogs are retrieved. + std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); + // Entries are returned in ascending timestamp order. + auto updateOplogEntry = assertGet(OplogEntry::parse(oplogs.back())); + checkPreImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkPostImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkSideCollectionIfNeeded(opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkChangeStreamImagesIfNeeded(opCtx, testCase, updateEntryArgs, updateOplogEntry); } } +TEST_F(OnUpdateOutputsTest, TestFundamentalTransactionOnUpdateOutputs) { + // Create a registry that only registers the Impl. It can be challenging to call methods on + // the Impl directly. It falls into cases where `ReservedTimes` is expected to be + // instantiated. Due to strong encapsulation, we use the registry that managers the + // `ReservedTimes` on our behalf. + OpObserverRegistry opObserver; + opObserver.addObserver(std::make_unique<OpObserverImpl>()); + + for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) { + const auto& testCase = _cases[testIdx]; + if (testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) { + continue; + } + logTestCase(testCase); + + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + + // Phase 1: Clearing any state and setting up fixtures/the update call. + resetOplogAndTransactions(opCtx); + + std::unique_ptr<MongoDOperationContextSession> contextSession; + if (testCase.isRetryable()) { + beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession); + } else { + beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession); + } + + // Phase 2: Call the code we're testing. + CollectionUpdateArgs updateArgs; + OplogUpdateEntryArgs updateEntryArgs(&updateArgs, _nss, _uuid); + initializeOplogUpdateEntryArgs(opCtx, testCase, &updateEntryArgs); + + WriteUnitOfWork wuow(opCtx); + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + opObserver.onUpdate(opCtx, updateEntryArgs); + commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver); + wuow.commit(); + + // Phase 3: Analyze the results: + // This `getNOplogEntries` also asserts that all oplogs are retrieved. + std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); + // Entries are returned in ascending timestamp order. + auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back())); + auto updateOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry); + checkPreImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkPostImageInOplogIfNeeded(testCase, updateEntryArgs, oplogs, updateOplogEntry); + checkSideCollectionIfNeeded(opCtx, testCase, updateEntryArgs, oplogs, updateOplogEntry); + } +} struct InsertTestCase { bool isRetryableWrite; @@ -1882,17 +2166,9 @@ TEST_F(OpObserverTest, TestFundamentalOnInsertsOutputs) { toInsert.emplace_back(stmtId, BSON("_id" << stmtIdx)); } - boost::optional<MongoDOperationContextSession> contextSession; - boost::optional<TransactionParticipant::Participant> txnParticipant; + std::unique_ptr<MongoDOperationContextSession> contextSession; if (testCase.isRetryableWrite) { - opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); - opCtx->setTxnNumber(TxnNumber(testIdx)); - contextSession.emplace(opCtx); - txnParticipant.emplace(TransactionParticipant::get(opCtx)); - txnParticipant->beginOrContinue(opCtx, - {TxnNumber(testIdx)}, - boost::none /* autocommit */, - boost::none /* startTransaction */); + beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession); } // Phase 2: Call the code we're testing. @@ -1971,31 +2247,10 @@ struct DeleteTestCase { } }; -TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) { - // Create a registry that only registers the Impl. It can be challenging to call methods on the - // Impl directly. It falls into cases where `ReservedTimes` is expected to be instantiated. Due - // to strong encapsulation, we use the registry that managers the `ReservedTimes` on our behalf. - OpObserverRegistry opObserver; - opObserver.addObserver(std::make_unique<OpObserverImpl>()); - - NamespaceString nss("test", "coll"); - CollectionUUID uuid = CollectionUUID::gen(); - - // For the DeleteTestCase, we add a "pre-image" deletedDoc when using `kRecordInOplog` and - // `kRecordInSideCollection`. - std::vector<DeleteTestCase> cases{ - {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, - {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, - {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, - {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, - {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, - {kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, - {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, - {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}}; +class OnDeleteOutputsTest : public OpObserverTest { - for (std::size_t testIdx = 0; testIdx < cases.size(); ++testIdx) { - const auto& testCase = cases[testIdx]; +protected: + void logTestCase(const DeleteTestCase& testCase) { LOGV2(5739905, "DeleteTestCase", "PreImageRecording"_attr = testCase.alwaysRecordPreImages, @@ -2003,111 +2258,214 @@ TEST_F(OpObserverTest, TestFundamentalOnDeleteOutputs) { "RetryableFindAndModifyLocation"_attr = testCase.getRetryableFindAndModifyLocationStr(), "ExpectedOplogEntries"_attr = testCase.numOutputOplogs); + } - OplogDeleteEntryArgs deleteArgs; - deleteArgs.preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; - deleteArgs.changeStreamPreAndPostImagesEnabledForCollection = + void initializeOplogDeleteEntryArgs(OperationContext* opCtx, + const DeleteTestCase& testCase, + OplogDeleteEntryArgs* deleteArgs) { + deleteArgs->preImageRecordingEnabledForCollection = testCase.alwaysRecordPreImages; + deleteArgs->changeStreamPreAndPostImagesEnabledForCollection = testCase.changeStreamImagesEnabled; - auto opCtxRaii = cc().makeOperationContext(); - OperationContext* opCtx = opCtxRaii.get(); - // Phase 1: Clearing any state and setting up fixtures/the update call. - resetOplogAndTransactions(opCtx); - - boost::optional<MongoDOperationContextSession> contextSession; - boost::optional<TransactionParticipant::Participant> txnParticipant; - if (testCase.retryableOptions != kNotRetryable) { - opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); - opCtx->setTxnNumber(TxnNumber(testIdx)); - contextSession.emplace(opCtx); - txnParticipant.emplace(TransactionParticipant::get(opCtx)); - txnParticipant->beginOrContinue(opCtx, - {TxnNumber(testIdx)}, - boost::none /* autocommit */, - boost::none /* startTransaction */); - } switch (testCase.retryableOptions) { case kNotRetryable: - deleteArgs.retryableFindAndModifyLocation = kNotRetryable; + deleteArgs->retryableFindAndModifyLocation = kNotRetryable; break; case kRecordInOplog: - deleteArgs.retryableFindAndModifyLocation = kRecordInOplog; + deleteArgs->retryableFindAndModifyLocation = kRecordInOplog; break; case kRecordInSideCollection: - deleteArgs.retryableFindAndModifyLocation = kRecordInSideCollection; + deleteArgs->retryableFindAndModifyLocation = kRecordInSideCollection; break; } - - const BSONObj deletedDoc = BSON("_id" << 0 << "valuePriorToDelete" - << "marvelous"); if (testCase.isRetryable() || testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) { - deleteArgs.deletedDoc = &deletedDoc; + deleteArgs->deletedDoc = &_deletedDoc; } - // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect - // of setting of `documentKey` on the delete for sharding purposes. - // `OpObserverImpl::onDelete` asserts its existence. - documentKeyDecoration(opCtx).emplace(deletedDoc["_id"].wrap(), boost::none); - StmtId deleteStmtId = kUninitializedStmtId; - if (testCase.isRetryable()) { - deleteStmtId = {1}; - } - - // Phase 2: Call the code we're testing. - WriteUnitOfWork wuow(opCtx); - AutoGetCollection locks(opCtx, nss, LockMode::MODE_IX); - opObserver.onDelete(opCtx, nss, uuid, deleteStmtId, deleteArgs); - wuow.commit(); - - // Phase 3: Analyze the results: - - // This `getNOplogEntries` also asserts that all oplogs are retrieved. - std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); - // Entries are returned in ascending timestamp order. - const OplogEntry& actualOp = assertGet(OplogEntry::parse(oplogs.back())); + } + void checkPreImageInOplogIfNeeded(const DeleteTestCase& testCase, + const OplogDeleteEntryArgs& deleteArgs, + const std::vector<BSONObj>& oplogs, + const OplogEntry& deleteOplogEntry) { const bool checkPreImageInOplog = deleteArgs.preImageRecordingEnabledForCollection || deleteArgs.retryableFindAndModifyLocation == kRecordInOplog; if (checkPreImageInOplog) { - ASSERT(actualOp.getPreImageOpTime()); - const Timestamp preImageOpTime = actualOp.getPreImageOpTime()->getTimestamp(); + ASSERT(deleteOplogEntry.getPreImageOpTime()); + const Timestamp preImageOpTime = deleteOplogEntry.getPreImageOpTime()->getTimestamp(); ASSERT_FALSE(preImageOpTime.isNull()); - OplogEntry preImage = findByTimestamp(oplogs, preImageOpTime); - ASSERT_BSONOBJ_EQ(deletedDoc, preImage.getObject()); + OplogEntry preImage = *findByTimestamp(oplogs, preImageOpTime); + ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getObject()); } else { - ASSERT_FALSE(actualOp.getPreImageOpTime()); + ASSERT_FALSE(deleteOplogEntry.getPreImageOpTime()); } + } + void checkSideCollectionIfNeeded(OperationContext* opCtx, + const DeleteTestCase& testCase, + const OplogDeleteEntryArgs& deleteArgs, + const std::vector<BSONObj>& oplogs, + const OplogEntry& deleteOplogEntry) { bool didWriteInSideCollection = deleteArgs.retryableFindAndModifyLocation == kRecordInSideCollection && !deleteArgs.preImageRecordingEnabledForCollection; if (didWriteInSideCollection) { repl::ImageEntry imageEntry = - getImageEntryFromSideCollection(opCtx, *actualOp.getSessionId()); + getImageEntryFromSideCollection(opCtx, *deleteOplogEntry.getSessionId()); + ASSERT(imageEntry.getImageKind() == deleteOplogEntry.getNeedsRetryImage()); ASSERT(imageEntry.getImageKind() == repl::RetryImageEnum::kPreImage); - ASSERT_BSONOBJ_EQ(deletedDoc, imageEntry.getImage()); + ASSERT_BSONOBJ_EQ(_deletedDoc, imageEntry.getImage()); + + // If 'deleteOplogEntry' has opTime T, opTime T-1 must be reserved for potential forged + // noop oplog entry for the preImage written to the side collection. + const Timestamp forgeNoopTimestamp = deleteOplogEntry.getTimestamp() - 1; + ASSERT_FALSE(findByTimestamp(oplogs, forgeNoopTimestamp)); } else { - if (actualOp.getSessionId()) { - ASSERT_FALSE(didWriteImageEntryToSideCollection(opCtx, *actualOp.getSessionId())); + ASSERT_FALSE(deleteOplogEntry.getNeedsRetryImage()); + if (deleteOplogEntry.getSessionId()) { + ASSERT_FALSE( + didWriteImageEntryToSideCollection(opCtx, *deleteOplogEntry.getSessionId())); } else { // Session id is missing only for non-retryable option. ASSERT(testCase.retryableOptions == kNotRetryable); } } + } - const Timestamp preImageOpTime = actualOp.getOpTime().getTimestamp(); - ChangeStreamPreImageId preImageId(uuid, preImageOpTime, 0); + void checkChangeStreamImagesIfNeeded(OperationContext* opCtx, + const DeleteTestCase& testCase, + const OplogDeleteEntryArgs& deleteArgs, + const OplogEntry& deleteOplogEntry) { + const Timestamp preImageOpTime = deleteOplogEntry.getOpTime().getTimestamp(); + ChangeStreamPreImageId preImageId(_uuid, preImageOpTime, 0); if (deleteArgs.changeStreamPreAndPostImagesEnabledForCollection) { BSONObj container; ChangeStreamPreImage preImage = getChangeStreamPreImage(opCtx, preImageId, &container); - ASSERT_BSONOBJ_EQ(deletedDoc, preImage.getPreImage()); - ASSERT_EQ(actualOp.getWallClockTime(), preImage.getOperationTime()); + ASSERT_BSONOBJ_EQ(_deletedDoc, preImage.getPreImage()); + ASSERT_EQ(deleteOplogEntry.getWallClockTime(), preImage.getOperationTime()); } else { ASSERT_FALSE(didWriteDeletedDocToPreImagesCollection(opCtx, preImageId)); } } + + std::vector<DeleteTestCase> _cases{ + {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 1}, + {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kDoNotRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 1}, + {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kNotRetryable, 1}, + {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInOplog, 2}, + {kDoNotRecordPreImages, kChangeStreamImagesEnabled, kRecordInSideCollection, 1}, + {kRecordPreImages, kChangeStreamImagesDisabled, kNotRetryable, 2}, + {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInOplog, 2}, + {kRecordPreImages, kChangeStreamImagesDisabled, kRecordInSideCollection, 2}}; + + const NamespaceString _nss{"test", "coll"}; + const CollectionUUID _uuid = CollectionUUID::gen(); + const BSONObj _deletedDoc = BSON("_id" << 0 << "valuePriorToDelete" + << "marvelous"); +}; + +TEST_F(OnDeleteOutputsTest, TestNonTransactionFundamentalOnDeleteOutputs) { + // Create a registry that only registers the Impl. It can be challenging to call methods on + // the Impl directly. It falls into cases where `ReservedTimes` is expected to be + // instantiated. Due to strong encapsulation, we use the registry that managers the + // `ReservedTimes` on our behalf. + OpObserverRegistry opObserver; + opObserver.addObserver(std::make_unique<OpObserverImpl>()); + + for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) { + const auto& testCase = _cases[testIdx]; + logTestCase(testCase); + + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + + // Phase 1: Clearing any state and setting up fixtures/the delete call. + resetOplogAndTransactions(opCtx); + + std::unique_ptr<MongoDOperationContextSession> contextSession; + if (testCase.isRetryable()) { + beginRetryableWriteWithTxnNumber(opCtx, testIdx, contextSession); + } + + // Phase 2: Call the code we're testing. + OplogDeleteEntryArgs deleteEntryArgs; + initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs); + + WriteUnitOfWork wuow(opCtx); + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect + // of setting of `documentKey` on the delete for sharding purposes. + // `OpObserverImpl::onDelete` asserts its existence. + documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none); + opObserver.onDelete( + opCtx, _nss, _uuid, testCase.isRetryable() ? 1 : kUninitializedStmtId, deleteEntryArgs); + wuow.commit(); + + // Phase 3: Analyze the results: + // This `getNOplogEntries` also asserts that all oplogs are retrieved. + std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); + // Entries are returned in ascending timestamp order. + auto deleteOplogEntry = assertGet(OplogEntry::parse(oplogs.back())); + checkPreImageInOplogIfNeeded(testCase, deleteEntryArgs, oplogs, deleteOplogEntry); + checkSideCollectionIfNeeded(opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry); + checkChangeStreamImagesIfNeeded(opCtx, testCase, deleteEntryArgs, deleteOplogEntry); + } } +TEST_F(OnDeleteOutputsTest, TestTransactionFundamentalOnDeleteOutputs) { + // Create a registry that only registers the Impl. It can be challenging to call methods on + // the Impl directly. It falls into cases where `ReservedTimes` is expected to be + // instantiated. Due to strong encapsulation, we use the registry that managers the + // `ReservedTimes` on our behalf. + OpObserverRegistry opObserver; + opObserver.addObserver(std::make_unique<OpObserverImpl>()); + + for (std::size_t testIdx = 0; testIdx < _cases.size(); ++testIdx) { + const auto& testCase = _cases[testIdx]; + if (testCase.alwaysRecordPreImages || testCase.changeStreamImagesEnabled) { + continue; + } + logTestCase(testCase); + + auto opCtxRaii = cc().makeOperationContext(); + OperationContext* opCtx = opCtxRaii.get(); + + // Phase 1: Clearing any state and setting up fixtures/the delete call. + resetOplogAndTransactions(opCtx); + + std::unique_ptr<MongoDOperationContextSession> contextSession; + if (testCase.isRetryable()) { + beginRetryableInternalTransactionWithTxnNumber(opCtx, testIdx, contextSession); + } else { + beginNonRetryableTransactionWithTxnNumber(opCtx, testIdx, contextSession); + } + + // Phase 2: Call the code we're testing. + OplogDeleteEntryArgs deleteEntryArgs; + initializeOplogDeleteEntryArgs(opCtx, testCase, &deleteEntryArgs); + const auto stmtId = testCase.isRetryable() ? 1 : kUninitializedStmtId; + + WriteUnitOfWork wuow(opCtx); + AutoGetCollection locks(opCtx, _nss, LockMode::MODE_IX); + // This test does not call `OpObserver::aboutToDelete`. That method has the side-effect + // of setting of `documentKey` on the delete for sharding purposes. + // `OpObserverImpl::onDelete` asserts its existence. + documentKeyDecoration(opCtx).emplace(_deletedDoc["_id"].wrap(), boost::none); + opObserver.onDelete(opCtx, _nss, _uuid, stmtId, deleteEntryArgs); + commitUnpreparedTransaction<OpObserverRegistry>(opCtx, opObserver); + wuow.commit(); + + // Phase 3: Analyze the results: + // This `getNOplogEntries` also asserts that all oplogs are retrieved. + std::vector<BSONObj> oplogs = getNOplogEntries(opCtx, testCase.numOutputOplogs); + // Entries are returned in ascending timestamp order. + auto applyOpsOplogEntry = assertGet(OplogEntry::parse(oplogs.back())); + auto deleteOplogEntry = getInnerEntryFromApplyOpsOplogEntry(applyOpsOplogEntry); + checkPreImageInOplogIfNeeded(testCase, deleteEntryArgs, oplogs, deleteOplogEntry); + checkSideCollectionIfNeeded(opCtx, testCase, deleteEntryArgs, oplogs, deleteOplogEntry); + } +} TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { const NamespaceString nss1("testDB", "testColl"); diff --git a/src/mongo/db/ops/write_ops.idl b/src/mongo/db/ops/write_ops.idl index 2254f4fcb58..415b50f9ea1 100644 --- a/src/mongo/db/ops/write_ops.idl +++ b/src/mongo/db/ops/write_ops.idl @@ -417,6 +417,10 @@ commands: description: "When true, returns the modified document rather than the original." type: safeBool optional: true + stmtId: + description: "The statement number for this findAndModify operation." + type: int + optional: true bypassDocumentValidation: description: "Enables the operation to bypass document validation. This lets you write documents that do not meet the validation requirements." diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 0bffa5e01b3..7ee378a0f9d 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -1550,17 +1550,18 @@ Status applyOperation_inlock(OperationContext* opCtx, } if (op.getNeedsRetryImage()) { - writeToImageCollection(opCtx, - op.getSessionId().get(), - op.getTxnNumber().get(), - op.getTimestamp(), - op.getNeedsRetryImage().get(), - // If we did not request an image because we're in - // initial sync, the value passed in here is conveniently - // the empty BSONObj. - ur.requestedDocImage, - getInvalidatingReason(mode, isDataConsistent), - &upsertConfigImage); + writeToImageCollection( + opCtx, + op.getSessionId().get(), + op.getTxnNumber().get(), + op.getTimestampForRetryImage().value_or(op.getTimestamp()), + op.getNeedsRetryImage().get(), + // If we did not request an image because we're in + // initial sync, the value passed in here is conveniently + // the empty BSONObj. + ur.requestedDocImage, + getInvalidatingReason(mode, isDataConsistent), + &upsertConfigImage); } wuow.commit(); @@ -1625,14 +1626,15 @@ Status applyOperation_inlock(OperationContext* opCtx, // isn't strictly necessary for correctness -- the `config.transactions` table // is responsible for whether to retry. The motivation here is to simply reduce // the number of states related documents in the two collections can be in. - writeToImageCollection(opCtx, - op.getSessionId().get(), - op.getTxnNumber().get(), - op.getTimestamp(), - repl::RetryImageEnum::kPreImage, - result.requestedPreImage.value_or(BSONObj()), - getInvalidatingReason(mode, isDataConsistent), - &upsertConfigImage); + writeToImageCollection( + opCtx, + op.getSessionId().get(), + op.getTxnNumber().get(), + op.getTimestampForRetryImage().value_or(op.getTimestamp()), + repl::RetryImageEnum::kPreImage, + result.requestedPreImage.value_or(BSONObj()), + getInvalidatingReason(mode, isDataConsistent), + &upsertConfigImage); } if (result.nDeleted == 0 && mode == OplogApplication::Mode::kSecondary) { diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index 90156b855b1..ce141a2b987 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -85,6 +85,12 @@ public: } void setPreImage(BSONObj value) { + if (!_fullPreImage.isEmpty()) { + uassert(6054003, + "Cannot set pre-image more than once", + _fullPreImage.woCompare(value) == 0); + return; + } _fullPreImage = std::move(value); } @@ -93,6 +99,12 @@ public: } void setPostImage(BSONObj value) { + if (!_fullPostImage.isEmpty()) { + uassert(6054004, + "Cannot set post-image more than once", + _fullPostImage.woCompare(value) == 0); + return; + } _fullPostImage = std::move(value); } @@ -117,6 +129,9 @@ public: private: BSONObj _preImageDocumentKey; + + // Used for storing the pre-image and post-image for the operation in-memory regardless of where + // the images should be persisted. BSONObj _fullPreImage; BSONObj _fullPostImage; }; @@ -563,6 +578,14 @@ public: void setPostImageOp(std::shared_ptr<DurableOplogEntry> postImageOp); void setPostImageOp(const BSONObj& postImageOp); + void setTimestampForRetryImage(Timestamp value) & { + _timestampForRetryImage = std::move(value); + } + + boost::optional<Timestamp> getTimestampForRetryImage() const { + return _timestampForRetryImage; + } + std::string toStringForLogging() const; /** @@ -624,6 +647,17 @@ private: std::shared_ptr<DurableOplogEntry> _postImageOp; bool _isForCappedCollection = false; + + // During oplog application on secondaries, oplog entries extracted from each applyOps oplog + // entry for a transaction are given the timestamp of the terminal applyOps oplog entry. + // Similarly, during oplog replay, oplog entries extracted from each applyOps oplog entry for + // a transaction are given the timestamp of the commit oplog entry. As a result, some of those + // oplog entries may have timestamp that is not equal to the timestamp of applyOps oplog entry + // that they corresponds to, and it is incorrect to use that timestamp when writing image + // collection entries. As such, during transaction oplog application, _timestampForRetryImage + // will be used to store the timestamp of the applyOps oplog entry that this operation + // actually corresponds to if an image collection entry is expected to be written. + boost::optional<Timestamp> _timestampForRetryImage = boost::none; }; std::ostream& operator<<(std::ostream& s, const DurableOplogEntry& o); diff --git a/src/mongo/db/repl/transaction_oplog_application.cpp b/src/mongo/db/repl/transaction_oplog_application.cpp index d2fb7f0a178..a838e8566f6 100644 --- a/src/mongo/db/repl/transaction_oplog_application.cpp +++ b/src/mongo/db/repl/transaction_oplog_application.cpp @@ -319,6 +319,12 @@ std::pair<std::vector<OplogEntry>, bool> _readTransactionOperationsFromOplogChai invariant(operationEntry.isPartialTransaction()); auto prevOpsEnd = ops.size(); repl::ApplyOps::extractOperationsTo(operationEntry, lastEntryInTxnObj, &ops); + for (auto opIter = ops.begin() + prevOpsEnd; opIter != ops.end(); ++opIter) { + auto& op = *opIter; + if (op.getNeedsRetryImage()) { + op.setTimestampForRetryImage(operationEntry.getTimestamp()); + } + } // Because BSONArrays do not have fast way of determining size without iterating through // them, and we also have no way of knowing how many oplog entries are in a transaction diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 5af1f1ef48a..b90c1bf4647 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1432,6 +1432,10 @@ void TransactionParticipant::Participant::addTransactionOperation( p().transactionOperationBytes += operation.getPreImage().objsize(); ++p().numberOfPrePostImagesToWrite; } + if (!operation.getPostImage().isEmpty()) { + p().transactionOperationBytes += operation.getPostImage().objsize(); + ++p().numberOfPrePostImagesToWrite; + } auto transactionSizeLimitBytes = gTransactionSizeLimitBytes.load(); uassert(ErrorCodes::TransactionTooLarge, diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index e3be7aa0d03..76bb35f37f9 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -679,6 +679,10 @@ public: return p().transactionOperations; } + size_t getNumberOfPrePostImagesToWriteForTest() const { + return p().numberOfPrePostImagesToWrite; + } + const Locker* getTxnResourceStashLockerForTest() const { invariant(o().txnResourceStash); return o().txnResourceStash->locker(); |