diff options
Diffstat (limited to 'src/mongo')
24 files changed, 841 insertions, 302 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h index 5bf86445b7a..a2a0183f65f 100644 --- a/src/mongo/db/auth/auth_op_observer.h +++ b/src/mongo/db/auth/auth_op_observer.h @@ -188,10 +188,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h index 8f4bd2a5835..920508eef86 100644 --- a/src/mongo/db/fcv_op_observer.h +++ b/src/mongo/db/fcv_op_observer.h @@ -184,10 +184,23 @@ public: OplogSlot commitOplogEntryOpTime, Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final{}; - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final{}; + + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final{}; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final{}; void onMajorityCommitPointUpdate(ServiceContext* service, diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h index 348633c863b..b266555151c 100644 --- a/src/mongo/db/free_mon/free_mon_op_observer.h +++ b/src/mongo/db/free_mon/free_mon_op_observer.h @@ -188,10 +188,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index 8dd1fec50fd..02c2ba93904 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -786,15 +786,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) { repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() == repl::ReplicationCoordinator::modeNone; if (!isStandalone) { - try { - PeriodicChangeStreamExpiredPreImagesRemover::get(serviceContext)->start(); - } catch (ExceptionFor<ErrorCodes::PeriodicJobIsStopped>&) { - LOGV2_WARNING(5869107, "Not starting periodic jobs as shutdown is in progress"); - // Shutdown has already started before initialization is complete. Wait for the - // shutdown task to complete and return. - MONGO_IDLE_THREAD_BLOCK; - return waitForShutdown(); - } + startChangeStreamExpiredPreImagesRemover(serviceContext); } // Set up the logical session cache @@ -1275,16 +1267,6 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { exec->join(); } - const auto isStandalone = - repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() == - repl::ReplicationCoordinator::modeNone; - if (!isStandalone) { - LOGV2_OPTIONS(5869108, - {LogComponent::kQuery}, - "Shutting down the ChangeStreamExpiredPreImagesRemover"); - PeriodicChangeStreamExpiredPreImagesRemover::get(serviceContext)->stop(); - } - if (auto storageEngine = serviceContext->getStorageEngine()) { if (storageEngine->supportsReadConcernSnapshot()) { LOGV2(4784908, "Shutting down the PeriodicThreadToAbortExpiredTransactions"); @@ -1419,6 +1401,9 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { LOGV2(4784928, "Shutting down the TTL monitor"); shutdownTTLMonitor(serviceContext); + LOGV2(6278511, "Shutting down the Change Stream Expired Pre-images Remover"); + shutdownChangeStreamExpiredPreImagesRemover(serviceContext); + // We should always be able to acquire the global lock at shutdown. // An OperationContext is not necessary to call lockGlobal() during shutdown, as it's only used // to check that lockGlobal() is not called after a transaction timestamp has been set. diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h index f1e8031c440..6c725dfd21f 100644 --- a/src/mongo/db/op_observer.h +++ b/src/mongo/db/op_observer.h @@ -414,6 +414,57 @@ public: const std::vector<repl::ReplOperation>& statements) noexcept = 0; /** + * Contains "applyOps" oplog entries and oplog slots to be used for writing pre- and post- image + * oplog entries for a transaction. "applyOps" entries are not actual "applyOps" entries to be + * written to the oplog, but comprise certain parts of those entries - BSON serialized + * operations, and the assigned oplog slot. The operations in field 'ApplyOpsEntry::operations' + * should be considered opaque outside the OpObserver. + */ + struct ApplyOpsOplogSlotAndOperationAssignment { + struct ApplyOpsEntry { + OplogSlot oplogSlot; + std::vector<BSONObj> operations; + }; + + // Oplog slots to be used for writing pre- and post- image oplog entries. + std::vector<OplogSlot> prePostImageOplogEntryOplogSlots; + + // Representation of "applyOps" oplog entries. + std::vector<ApplyOpsEntry> applyOpsEntries; + + // Number of oplog slots utilized. + size_t numberOfOplogSlotsUsed; + }; + + /** + * This method is called before an atomic transaction is prepared. It must be called when a + * transaction is active. + * + * Optionally returns a representation of "applyOps" entries to be written and oplog slots to be + * used for writing pre- and post- image oplog entries for a transaction. Only one OpObserver in + * the system should return the representation of "applyOps" entries. The returned value is + * passed to 'onTransactionPrepare()'. + * + * The 'reservedSlots' is a list of oplog slots reserved for the oplog entries in a transaction. + * The last reserved slot represents the prepareOpTime used for the prepare oplog entry. + * + * The 'numberOfPrePostImagesToWrite' is the number of CRUD operations that have a pre-image + * to write as a noop oplog entry. + * + * The 'wallClockTime' is the time to record as wall clock time on oplog entries resulting from + * transaction preparation. + * + * The 'statements' are the list of CRUD operations to be applied in this transaction. The + * operations may be modified by setting pre-image and post-image oplog entry timestamps. + */ + virtual std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) = 0; + + /** * The onTransactionPrepare method is called when an atomic transaction is prepared. It must be * called when a transaction is active. * @@ -422,14 +473,24 @@ public: * * The 'statements' are the list of CRUD operations to be applied in this transaction. * + * The 'applyOpsOperationAssignment' contains a representation of "applyOps" entries and oplog + * slots to be used for writing pre- and post- image oplog entries for a transaction. A value + * returned by 'preTransactionPrepare()' should be passed as 'applyOpsOperationAssignment'. + * * The 'numberOfPrePostImagesToWrite' is the number of CRUD operations that have a pre-image * to write as a noop oplog entry. The op observer will reserve oplog slots for these * preimages in addition to the statements. + * + * The 'wallClockTime' is the time to record as wall clock time on oplog entries resulting from + * transaction preparation. The same time value should be passed to 'preTransactionPrepare()'. */ - virtual void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) = 0; + virtual void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) = 0; /** * The onTransactionAbort method is called when an atomic transaction aborts, before the diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index dda43be72ee..fac3cd11e1e 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -33,6 +33,7 @@ #include "mongo/db/op_observer_impl.h" +#include <algorithm> #include <limits> #include "mongo/bson/bsonobjbuilder.h" @@ -102,8 +103,12 @@ Date_t getWallClockTimeForOpLog(OperationContext* opCtx) { return clockSource->now(); } -repl::OpTime logOperation(OperationContext* opCtx, MutableOplogEntry* oplogEntry) { - oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx)); +repl::OpTime logOperation(OperationContext* opCtx, + MutableOplogEntry* oplogEntry, + bool assignWallClockTime = true) { + if (assignWallClockTime) { + oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx)); + } auto& times = OpObserver::Times::get(opCtx).reservedOpTimes; auto opTime = repl::logOp(opCtx, oplogEntry); times.push_back(opTime); @@ -1320,8 +1325,8 @@ namespace { */ void writeChangeStreamPreImagesForApplyOpsEntries( OperationContext* opCtx, - const std::vector<repl::ReplOperation>::iterator& stmtBegin, - const std::vector<repl::ReplOperation>::iterator& stmtEnd, + std::vector<repl::ReplOperation>::const_iterator stmtBegin, + std::vector<repl::ReplOperation>::const_iterator stmtEnd, Timestamp applyOpsTimestamp, Date_t operationTime) { int64_t applyOpsIndex{0}; @@ -1342,20 +1347,175 @@ void writeChangeStreamPreImagesForApplyOpsEntries( } } +/** + * Returns transaction operations that can fit into an "applyOps" entry. The returned operations are + * serialized to BSON. The transaction operations are given by range ['operationsBegin', + * 'operationsEnd'). Constraints for fitting the operations: (1) the resulting "applyOps" entry + * shouldn't exceed the 16MB limit, unless only one operation is allocated to it; (2) the number of + * operations is not larger than the maximum number of transaction statements allowed in one entry + * as defined by 'gMaxNumberOfTransactionOperationsInSingleOplogEntry'. + */ +std::vector<BSONObj> packTransactionOperationsIntoApplyOps( + std::vector<repl::ReplOperation>::const_iterator operationsBegin, + std::vector<repl::ReplOperation>::const_iterator operationsEnd) { + tassert(6278503, + "gMaxNumberOfTransactionOperationsInSingleOplogEntry should be positive number", + gMaxNumberOfTransactionOperationsInSingleOplogEntry > 0); + std::vector<BSONObj> operations; + size_t totalOperationsSize{0}; + for (auto operationIter = operationsBegin; operationIter != operationsEnd; ++operationIter) { + const auto& operation = *operationIter; + + // Stop packing when either number of transaction operations is reached, or when the next + // one would make the total size of operations larger than the maximum BSON Object User + // Size. We rely on the headroom between BSONObjMaxUserSize and BSONObjMaxInternalSize to + // cover the BSON overhead and the other "applyOps" entry fields. But if a single operation + // in the set exceeds BSONObjMaxUserSize, we still fit it, as a single max-length operation + // should be able to be packed into an "applyOps" entry. + if (operations.size() == + static_cast<size_t>(gMaxNumberOfTransactionOperationsInSingleOplogEntry) || + (operations.size() > 0 && + (totalOperationsSize + DurableOplogEntry::getDurableReplOperationSize(operation) > + BSONObjMaxUserSize))) { + break; + } + auto serializedOperation = operation.toBSON(); + totalOperationsSize += static_cast<size_t>(serializedOperation.objsize()); + operations.emplace_back(std::move(serializedOperation)); + } + return operations; +} + +/** + * Returns oplog slots to be used for "applyOps" oplog entries, BSON serialized operations, their + * assignments to "applyOps" entries, and oplog slots to be used for writing pre- and post- image + * oplog entries for the transaction consisting of 'operations'. Allocates oplog slots from + * 'oplogSlots'. The 'numberOfPrePostImagesToWrite' is the number of CRUD operations that have a + * pre-image to write as a noop oplog entry. The 'prepare' indicates if the function is called when + * preparing a transaction. + */ +OpObserver::ApplyOpsOplogSlotAndOperationAssignment +getApplyOpsOplogSlotAndOperationAssignmentForTransaction( + OperationContext* opCtx, + const std::vector<OplogSlot>& oplogSlots, + size_t numberOfPrePostImagesToWrite, + bool prepare, + std::vector<repl::ReplOperation>& operations) { + if (operations.empty()) { + return {{}, {}, 0 /*numberOfOplogSlotsUsed*/}; + } + tassert(6278504, "Insufficient number of oplogSlots", operations.size() <= oplogSlots.size()); + + std::vector<OplogSlot> prePostImageOplogEntryOplogSlots; + std::vector<OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry> applyOpsEntries; + const auto operationCount = operations.size(); + auto oplogSlotIter = oplogSlots.begin(); + auto getNextOplogSlot = [&]() { + tassert(6278505, "Unexpected end of oplog slot vector", oplogSlotIter != oplogSlots.end()); + return *oplogSlotIter++; + }; + + auto isMigratingTenant = [&opCtx]() { + return static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx)); + }; + + // We never want to store pre-images or post-images when we're migrating oplog entries from + // another replica set. + if (numberOfPrePostImagesToWrite > 0 && !isMigratingTenant()) { + for (size_t operationIdx = 0; operationIdx < operationCount; ++operationIdx) { + auto& statement = operations[operationIdx]; + if (statement.isChangeStreamPreImageRecordedInOplog() || + (statement.isPreImageRecordedForRetryableInternalTransaction() && + statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) { + tassert(6278506, "Expected a pre-image", !statement.getPreImage().isEmpty()); + auto oplogSlot = getNextOplogSlot(); + prePostImageOplogEntryOplogSlots.push_back(oplogSlot); + statement.setPreImageOpTime(oplogSlot); + } + if (!statement.getPostImage().isEmpty() && + statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) { + auto oplogSlot = getNextOplogSlot(); + prePostImageOplogEntryOplogSlots.push_back(oplogSlot); + statement.setPostImageOpTime(oplogSlot); + } + } + } + + auto hasNeedsRetryImage = [](const repl::ReplOperation& operation) { + return static_cast<bool>(operation.getNeedsRetryImage()); + }; + + // Assign operations to "applyOps" entries. + for (auto operationIt = operations.begin(); operationIt != operations.end();) { + auto applyOpsOperations = + packTransactionOperationsIntoApplyOps(operationIt, operations.end()); + const auto opCountWithNeedsRetryImage = + std::count_if(operationIt, operationIt + applyOpsOperations.size(), hasNeedsRetryImage); + if (opCountWithNeedsRetryImage > 0) { + // Reserve a slot for a forged no-op entry. + getNextOplogSlot(); + } + operationIt += applyOpsOperations.size(); + applyOpsEntries.emplace_back( + OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry{ + getNextOplogSlot(), std::move(applyOpsOperations)}); + } + + // In the special case of writing the implicit 'prepare' oplog entry, we use the last reserved + // oplog slot. This may mean we skipped over some reserved slots, but there's no harm in that. + if (prepare) { + applyOpsEntries.back().oplogSlot = oplogSlots.back(); + } + return {std::move(prePostImageOplogEntryOplogSlots), + std::move(applyOpsEntries), + static_cast<size_t>(oplogSlotIter - oplogSlots.begin())}; +} + +/** + * Writes change stream pre-images for transaction 'operations'. The 'applyOpsOperationAssignment' + * contains a representation of "applyOps" entries to be written for the transaction. The + * 'operationTime' is wall clock time of the operations used for the pre-image documents. + */ +void writeChangeStreamPreImagesForTransaction( + OperationContext* opCtx, + const std::vector<repl::ReplOperation>& operations, + const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment, + Date_t operationTime) { + // This function must be called from an outer WriteUnitOfWork in order to be rolled back upon + // reaching the exception. + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + + auto applyOpsEntriesIt = applyOpsOperationAssignment.applyOpsEntries.begin(); + for (auto operationIter = operations.begin(); operationIter != operations.end();) { + tassert(6278507, + "Unexpected end of applyOps entries vector", + applyOpsEntriesIt != applyOpsOperationAssignment.applyOpsEntries.end()); + const auto& applyOpsEntry = *applyOpsEntriesIt++; + const auto operationSequenceEnd = operationIter + applyOpsEntry.operations.size(); + writeChangeStreamPreImagesForApplyOpsEntries(opCtx, + operationIter, + operationSequenceEnd, + applyOpsEntry.oplogSlot.getTimestamp(), + operationTime); + operationIter = operationSequenceEnd; + } +} + // Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array -// field. Appends as many operations as possible to the array (and their corresponding statement -// ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the -// maximum number of transaction statements allowed in one entry. If any of the statements has -// a pre-image or post-image that needs to be stored in the image collection, stores it to -// 'imageToWrite'. -// -// Returns an iterator to the first statement that wasn't packed into the applyOps object. -std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( +// field (and their corresponding statement ids to 'stmtIdsWritten'). The transaction statements are +// represented as range ['stmtBegin', 'stmtEnd') and BSON serialized objects 'operations'. If any of +// the statements has a pre-image or post-image that needs to be stored in the image collection, +// stores it to 'imageToWrite'. +void packTransactionStatementsForApplyOps( BSONObjBuilder* applyOpsBuilder, std::vector<StmtId>* stmtIdsWritten, boost::optional<std::pair<repl::RetryImageEnum, BSONObj>>* imageToWrite, std::vector<repl::ReplOperation>::iterator stmtBegin, - std::vector<repl::ReplOperation>::iterator stmtEnd) { + std::vector<repl::ReplOperation>::iterator stmtEnd, + const std::vector<BSONObj>& operations) { + tassert(6278508, + "Number of operations does not match the number of transaction statements", + operations.size() == static_cast<size_t>(stmtEnd - stmtBegin)); auto setImageToWrite = [&](const repl::ReplOperation& stmt) { uassert(6054001, str::stream() << NamespaceString::kConfigImagesNamespace @@ -1381,21 +1541,11 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( }; std::vector<repl::ReplOperation>::iterator stmtIter; + auto operationsIter = operations.begin(); BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd)); for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) { const auto& stmt = *stmtIter; - // Stop packing when either number of transaction operations is reached, or when the next - // one would put the array over the maximum BSON Object User Size. We rely on the head room - // between BSONObjMaxUserSize and BSONObjMaxInternalSize to cover the BSON overhead and the - // other applyOps fields. But if the array with a single operation exceeds - // BSONObjMaxUserSize, we still log it, as a single max-length operation should be able to - // be applied. - if (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry || - (opsArray.arrSize() > 0 && - (opsArray.len() + DurableOplogEntry::getDurableReplOperationSize(stmt) > - BSONObjMaxUserSize))) - break; - opsArray.append(stmt.toBSON()); + opsArray.append(*operationsIter++); const auto stmtIds = stmt.getStatementIds(); stmtIdsWritten->insert(stmtIdsWritten->end(), stmtIds.begin(), stmtIds.end()); if (stmt.getNeedsRetryImage()) { @@ -1413,7 +1563,6 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps( e.code() != ErrorCodes::BSONObjectTooLarge); throw; } - return stmtIter; } // Logs one applyOps entry and may update the transactions table. Assumes that the given BSON @@ -1449,7 +1598,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, try { OpTimeBundle times; - times.writeOpTime = logOperation(opCtx, oplogEntry); + times.writeOpTime = logOperation(opCtx, oplogEntry, false /*assignWallClockTime*/); times.wallClockTime = oplogEntry->getWallClockTime(); if (updateTxnTable) { SessionTxnRecord sessionTxnRecord; @@ -1487,6 +1636,10 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, // number of applyOps written will be used. It also expects that the vector of given statements is // non-empty. // +// The 'applyOpsOperationAssignment' contains BSON serialized transaction statements, their +// assigment to "applyOps" oplog entries, and oplog slots to be used for writing pre- and post- +// image oplog entries for a transaction. +// // In the case of writing entries for a prepared transaction, the last oplog entry (i.e. the // implicit prepare) will always be written using the last oplog slot given, even if this means // skipping over some reserved slots. @@ -1496,11 +1649,12 @@ int logOplogEntriesForTransaction( OperationContext* opCtx, std::vector<repl::ReplOperation>* stmts, const std::vector<OplogSlot>& oplogSlots, + const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment, boost::optional<ImageBundle>* prePostImageToWriteToImageCollection, size_t numberOfPrePostImagesToWrite, - bool prepare) { + bool prepare, + Date_t wallClockTime) { invariant(!stmts->empty()); - invariant(stmts->size() <= oplogSlots.size()); // Storage transaction commit is the last place inside a transaction that can throw an // exception. In order to safely allow exceptions to be thrown at that point, this function must @@ -1516,15 +1670,17 @@ int logOplogEntriesForTransaction( invariant(opCtx->lockState()->isWriteLocked()); prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime(); - auto currOplogSlot = oplogSlots.begin(); + auto currPrePostImageOplogEntryOplogSlot = + applyOpsOperationAssignment.prePostImageOplogEntryOplogSlots.begin(); + // We never want to store pre-images or post-images when we're migrating oplog entries from // another replica set. const auto& migrationRecipientInfo = repl::tenantMigrationRecipientInfo(opCtx); auto logPrePostImageNoopEntry = [&](const repl::ReplOperation& statement, const BSONObj& imageDoc) { - auto slot = *currOplogSlot; - ++currOplogSlot; + auto slot = *currPrePostImageOplogEntryOplogSlot; + ++currPrePostImageOplogEntryOplogSlot; MutableOplogEntry imageEntry; imageEntry.setSessionId(*opCtx->getLogicalSessionId()); @@ -1537,7 +1693,7 @@ int logOplogEntriesForTransaction( imageEntry.setOpTime(slot); imageEntry.setDestinedRecipient(statement.getDestinedRecipient()); - return logOperation(opCtx, &imageEntry); + logOperation(opCtx, &imageEntry); }; if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) { @@ -1551,16 +1707,14 @@ int logOplogEntriesForTransaction( // image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the // pre-image will be written to the image collection (after all the applyOps oplog // entries are written). - auto opTime = logPrePostImageNoopEntry(statement, statement.getPreImage()); - statement.setPreImageOpTime(opTime); + logPrePostImageNoopEntry(statement, statement.getPreImage()); } if (!statement.getPostImage().isEmpty() && statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) { // Likewise, when 'needsRetryImage' is equal to kPostImage, the post-image will be // written to the image collection (after all the applyOps oplog entries are // written). - auto opTime = logPrePostImageNoopEntry(statement, statement.getPostImage()); - statement.setPostImageOpTime(opTime); + logPrePostImageNoopEntry(statement, statement.getPostImage()); } } } @@ -1573,12 +1727,22 @@ int logOplogEntriesForTransaction( // statements have been packed, it should point to stmts.end(), which is the loop's // termination condition. auto stmtsIter = stmts->begin(); + auto applyOpsIter = applyOpsOperationAssignment.applyOpsEntries.begin(); while (stmtsIter != stmts->end()) { + tassert(6278509, + "Not enough \"applyOps\" entries", + applyOpsIter != applyOpsOperationAssignment.applyOpsEntries.end()); + auto& applyOpsEntry = *applyOpsIter++; BSONObjBuilder applyOpsBuilder; boost::optional<std::pair<repl::RetryImageEnum, BSONObj>> imageToWrite; - auto nextStmt = packTransactionStatementsForApplyOps( - &applyOpsBuilder, &stmtIdsWritten, &imageToWrite, stmtsIter, stmts->end()); + const auto nextStmt = stmtsIter + applyOpsEntry.operations.size(); + packTransactionStatementsForApplyOps(&applyOpsBuilder, + &stmtIdsWritten, + &imageToWrite, + stmtsIter, + nextStmt, + applyOpsEntry.operations); // If we packed the last op, then the next oplog entry we log should be the implicit // commit or implicit prepare, i.e. we omit the 'partialTxn' field. @@ -1590,15 +1754,12 @@ int logOplogEntriesForTransaction( auto isPartialTxn = !lastOp; if (imageToWrite) { - // Reserve an oplog slot for potential forged noop oplog entry for the pre-image or - // post-image. uassert(6054002, str::stream() << NamespaceString::kConfigImagesNamespace << " can only store the pre or post image of one " "findAndModify operation for each " "transaction", !(*prePostImageToWriteToImageCollection)); - ++currOplogSlot; } if (isPartialTxn || (imageToWrite && !prepare)) { @@ -1629,12 +1790,6 @@ int logOplogEntriesForTransaction( // the first and last op. auto updateTxnTable = firstOp || lastOp; - // Use the next reserved oplog slot. In the special case of writing the implicit - // 'prepare' oplog entry, we use the last reserved oplog slot, since callers of this - // function will expect that timestamp to be used as the 'prepare' timestamp. This - // may mean we skipped over some reserved slots, but there's no harm in that. - auto oplogSlot = implicitPrepare ? oplogSlots.back() : *currOplogSlot++; - // The first optime of the transaction is always the first oplog slot, except in the // case of a single prepare oplog entry. auto firstOpTimeOfTxn = @@ -1646,8 +1801,9 @@ int logOplogEntriesForTransaction( auto startOpTime = boost::make_optional(!implicitCommit, firstOpTimeOfTxn); MutableOplogEntry oplogEntry; - oplogEntry.setOpTime(oplogSlot); + oplogEntry.setOpTime(applyOpsEntry.oplogSlot); oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime); + oplogEntry.setWallClockTime(wallClockTime); oplogEntry.setObject(applyOpsBuilder.done()); auto txnState = isPartialTxn ? DurableTxnStateEnum::kInProgress @@ -1670,15 +1826,11 @@ int logOplogEntriesForTransaction( prevWriteOpTime.writeOpTime.getTimestamp()}; } - const auto applyOpsEntryTimestamp = prevWriteOpTime.writeOpTime.getTimestamp(); - writeChangeStreamPreImagesForApplyOpsEntries( - opCtx, stmtsIter, nextStmt, applyOpsEntryTimestamp, oplogEntry.getWallClockTime()); - // Advance the iterator to the beginning of the remaining unpacked statements. stmtsIter = nextStmt; } - return currOplogSlot - oplogSlots.begin(); + return applyOpsOperationAssignment.numberOfOplogSlotsUsed; } void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, @@ -1759,10 +1911,27 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx, uasserted(51268, "hangAndFailUnpreparedCommitAfterReservingOplogSlot fail point enabled"); } + // Serialize transaction statements to BSON and determine their assignment to "applyOps" + // entries. + const auto applyOpsOplogSlotAndOperationAssignment = + getApplyOpsOplogSlotAndOperationAssignmentForTransaction( + opCtx, oplogSlots, numberOfPrePostImagesToWrite, false /*prepare*/, *statements); + + const auto wallClockTime = getWallClockTimeForOpLog(opCtx); + writeChangeStreamPreImagesForTransaction( + opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime); + // Log in-progress entries for the transaction along with the implicit commit. boost::optional<ImageBundle> imageToWrite; - int numOplogEntries = logOplogEntriesForTransaction( - opCtx, statements, oplogSlots, &imageToWrite, numberOfPrePostImagesToWrite, false); + int numOplogEntries = logOplogEntriesForTransaction(opCtx, + statements, + oplogSlots, + applyOpsOplogSlotAndOperationAssignment, + &imageToWrite, + numberOfPrePostImagesToWrite, + false /* prepare*/, + wallClockTime); + if (imageToWrite) { writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), @@ -1799,14 +1968,35 @@ void OpObserverImpl::onPreparedTransactionCommit( logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted); } -void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) { +std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment> +OpObserverImpl::preTransactionPrepare(OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) { + auto applyOpsOplogSlotAndOperationAssignment = + getApplyOpsOplogSlotAndOperationAssignmentForTransaction( + opCtx, reservedSlots, numberOfPrePostImagesToWrite, true /*prepare*/, *statements); + writeChangeStreamPreImagesForTransaction( + opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime); + return std::make_unique<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>( + std::move(applyOpsOplogSlotAndOperationAssignment)); +} + +void OpObserverImpl::onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) { invariant(!reservedSlots.empty()); const auto prepareOpTime = reservedSlots.back(); invariant(opCtx->getTxnNumber()); invariant(!prepareOpTime.isNull()); + tassert(6278510, + "Operation assignments to applyOps entries should be present", + applyOpsOperationAssignment); // Don't write oplog entry on secondaries. if (!opCtx->writesAreReplicated()) { @@ -1837,9 +2027,11 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, logOplogEntriesForTransaction(opCtx, statements, reservedSlots, + *applyOpsOperationAssignment, &imageToWrite, numberOfPrePostImagesToWrite, - true /* prepare */); + true /* prepare */, + wallClockTime); if (imageToWrite) { writeToImageCollection(opCtx, *opCtx->getLogicalSessionId(), @@ -1861,6 +2053,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx, oplogEntry.setOpTime(oplogSlot); oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime()); oplogEntry.setObject(applyOpsBuilder.done()); + oplogEntry.setWallClockTime(wallClockTime); logApplyOpsForTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kPrepared, diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h index f4177ad608e..7d40fe84307 100644 --- a/src/mongo/db/op_observer_impl.h +++ b/src/mongo/db/op_observer_impl.h @@ -192,10 +192,21 @@ public: OplogSlot commitOplogEntryOpTime, Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final; - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final; + + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final; + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final; void onMajorityCommitPointUpdate(ServiceContext* service, diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index f37e9e7cada..4b2be83993e 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -865,6 +865,22 @@ protected: return *_txnParticipant; } + void prepareTransaction(const std::vector<OplogSlot>& reservedSlots, + repl::OpTime prepareOpTime, + size_t numberOfPrePostImagesToWrite = 0) { + auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx()); + auto currentTime = Date_t::now(); + auto applyOpsAssignment = opObserver().preTransactionPrepare( + opCtx(), reservedSlots, numberOfPrePostImagesToWrite, currentTime, &txnOps); + opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); + opObserver().onTransactionPrepare(opCtx(), + reservedSlots, + &txnOps, + applyOpsAssignment.get(), + numberOfPrePostImagesToWrite, + currentTime); + } + private: class ExposeOpObserverTimes : public OpObserver { public: @@ -1008,9 +1024,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) { auto reservedSlots = repl::getNextOpTimes(opCtx(), 5); auto prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); + prepareTransaction(reservedSlots, prepareOpTime); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); @@ -1069,8 +1083,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { const auto prepareSlot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot); prepareTimestamp = prepareSlot.getTimestamp(); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), {prepareSlot}, &txnOps, 0); + prepareTransaction({prepareSlot}, prepareSlot); commitSlot = repl::getNextOpTime(opCtx()); } @@ -1138,8 +1151,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) { const auto prepareSlot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), {prepareSlot}, &txnOps, 0); + prepareTransaction({prepareSlot}, prepareSlot); abortSlot = repl::getNextOpTime(opCtx()); } @@ -1218,9 +1230,7 @@ TEST_F(OpObserverTransactionTest, WriteUnitOfWork wuow(opCtx()); prepareOpTime = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), {prepareOpTime}, &txnOps, 0); + prepareTransaction({prepareOpTime}, prepareOpTime); } auto oplogEntryObj = getSingleOplogEntry(opCtx()); @@ -1251,9 +1261,7 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable) OplogSlot slot = repl::getNextOpTime(opCtx()); txnParticipant.transitionToPreparedforTest(opCtx(), slot); prepareOpTime = slot; - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), {slot}, &txnOps, 0); - opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp()); + prepareTransaction({slot}, prepareOpTime); } ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp()); @@ -1284,9 +1292,7 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction Lock::GlobalLock lk(opCtx(), MODE_IX); WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); - opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), {slot}, &txnOps, 0); + prepareTransaction({slot}, slot); txnParticipant.transitionToPreparedforTest(opCtx(), slot); abortSlot = repl::getNextOpTime(opCtx()); } @@ -1358,9 +1364,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti WriteUnitOfWork wuow(opCtx()); OplogSlot slot = repl::getNextOpTime(opCtx()); prepareOpTime = slot; - opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), {slot}, &txnOps, 0); + prepareTransaction({slot}, slot); txnParticipant.transitionToPreparedforTest(opCtx(), slot); } @@ -1564,7 +1568,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) { auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant.unstashTransactionResources(opCtx(), "insert"); std::vector<InsertStatement> inserts; - inserts.emplace_back(0, BSON("_id" << 0)); + inserts.emplace_back(0, BSON("_id" << 0 << "a" << std::string(BSONObjMaxUserSize, 'a'))); WriteUnitOfWork wuow(opCtx()); AutoGetCollection autoColl1(opCtx(), nss, MODE_IX); @@ -1579,11 +1583,12 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) { ASSERT_EQ(repl::OpTime(), *oplogEntry.getPrevWriteOpTimeInTransaction()); // The implicit commit oplog entry. - auto oExpected = - BSON("applyOps" << BSON_ARRAY(BSON("op" - << "i" - << "ns" << nss.toString() << "ui" << uuid << "o" - << BSON("_id" << 0) << "o2" << BSON("_id" << 0)))); + auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON( + "op" + << "i" + << "ns" << nss.toString() << "ui" << uuid << "o" + << BSON("_id" << 0 << "a" << std::string(BSONObjMaxUserSize, 'a')) + << "o2" << BSON("_id" << 0)))); ASSERT_BSONOBJ_EQ(oExpected, oplogEntry.getObject()); } @@ -1761,14 +1766,14 @@ public: protected: void commit() final { - const auto prepareSlot = repl::getNextOpTime(opCtx()); + const auto reservedOplogSlots = repl::getNextOpTimes( + opCtx(), 1 + txnParticipant().getNumberOfPrePostImagesToWriteForTest()); + invariant(reservedOplogSlots.size() >= 1); + const auto prepareSlot = reservedOplogSlots.back(); txnParticipant().transitionToPreparedforTest(opCtx(), prepareSlot); - auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare( - opCtx(), - {prepareSlot}, - &txnOps, - txnParticipant().getNumberOfPrePostImagesToWriteForTest()); + prepareTransaction(reservedOplogSlots, + prepareSlot, + txnParticipant().getNumberOfPrePostImagesToWriteForTest()); }; BSONObj assertGetSingleOplogEntry() final { @@ -2874,9 +2879,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPreImageTest) { auto reservedSlots = repl::getNextOpTimes(opCtx(), 4); prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 2); + prepareTransaction(reservedSlots, prepareOpTime, 2); } auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); @@ -3001,9 +3004,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) { auto reservedSlots = repl::getNextOpTimes(opCtx(), 4); prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); + prepareTransaction(reservedSlots, prepareOpTime); } auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); std::vector<OplogEntry> oplogEntries; @@ -3093,9 +3094,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) { auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); + prepareTransaction(reservedSlots, prepareOpTime); auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); std::vector<OplogEntry> oplogEntries; @@ -3169,9 +3168,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) { prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); prepareOpTime = reservedSlots.back(); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); + prepareTransaction(reservedSlots, prepareOpTime); } auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); @@ -3232,10 +3229,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); + prepareTransaction(reservedSlots, prepareOpTime); } auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); @@ -3315,10 +3309,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) { auto reservedSlots = repl::getNextOpTimes(opCtx(), 1); prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); + prepareTransaction(reservedSlots, prepareOpTime); } auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); @@ -3448,9 +3439,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPackingTest) { auto reservedSlots = repl::getNextOpTimes(opCtx(), 4); prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); + prepareTransaction(reservedSlots, prepareOpTime); auto oplogEntryObj = getSingleOplogEntry(opCtx()); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -3505,10 +3494,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedPackingTest) { auto reservedSlots = repl::getNextOpTimes(opCtx(), 2); prepareOpTime = reservedSlots.back(); txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime); - - opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); - auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0); + prepareTransaction(reservedSlots, prepareOpTime); auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h index f458d891846..ab610264ce1 100644 --- a/src/mongo/db/op_observer_noop.h +++ b/src/mongo/db/op_observer_noop.h @@ -166,10 +166,21 @@ public: OplogSlot commitOplogEntryOpTime, Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept override{}; - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) override{}; + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) override { + return nullptr; + } + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) override{}; void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override{}; void onMajorityCommitPointUpdate(ServiceContext* service, diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h index c5f68467066..5186d6a3f47 100644 --- a/src/mongo/db/op_observer_registry.h +++ b/src/mongo/db/op_observer_registry.h @@ -384,14 +384,42 @@ public: opCtx, commitOplogEntryOpTime, commitTimestamp, statements); } - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) override { + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) override { + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> + applyOpsOplogSlotAndOperationAssignment; + for (auto&& observer : _observers) { + auto applyOpsAssignment = observer->preTransactionPrepare( + opCtx, reservedSlots, numberOfPrePostImagesToWrite, wallClockTime, statements); + tassert(6278501, + "More than one OpObserver returned operation to \"applyOps\" assignment", + !(applyOpsAssignment && applyOpsOplogSlotAndOperationAssignment)); + if (applyOpsAssignment) { + applyOpsOplogSlotAndOperationAssignment = std::move(applyOpsAssignment); + } + } + return applyOpsOplogSlotAndOperationAssignment; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) override { ReservedTimes times{opCtx}; for (auto& observer : _observers) { - observer->onTransactionPrepare( - opCtx, reservedSlots, statements, numberOfPrePostImagesToWrite); + observer->onTransactionPrepare(opCtx, + reservedSlots, + statements, + applyOpsOperationAssignment, + numberOfPrePostImagesToWrite, + wallClockTime); } } diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp index 9027c8d52de..91f99d83668 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp @@ -33,18 +33,22 @@ #include "change_stream_expired_pre_image_remover.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/catalog/collection.h" #include "mongo/db/change_stream_options_manager.h" #include "mongo/db/client.h" #include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/db_raii.h" +#include "mongo/db/namespace_string.h" #include "mongo/db/pipeline/change_stream_preimage_gen.h" #include "mongo/db/query/internal_plans.h" #include "mongo/db/record_id_helpers.h" -#include "mongo/db/repl/oplog_entry_gen.h" +#include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/repl/storage_interface.h" #include "mongo/db/service_context.h" #include "mongo/logv2/log.h" -#include "mongo/s/cluster_commands_helpers.h" +#include "mongo/util/background.h" +#include "mongo/util/concurrency/idle_thread_block.h" #include "mongo/util/fail_point.h" namespace mongo { @@ -374,66 +378,152 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim "numberOfRemovals"_attr = numberOfRemovals, "jobDuration"_attr = (Date_t::now() - startTime).toString()); } -} // namespace -PeriodicChangeStreamExpiredPreImagesRemover& PeriodicChangeStreamExpiredPreImagesRemover::get( - ServiceContext* serviceContext) { - auto& jobContainer = _serviceDecoration(serviceContext); - jobContainer._init(serviceContext); - return jobContainer; +void performExpiredChangeStreamPreImagesRemovalPass(Client* client) { + try { + Date_t currentTimeForTimeBasedExpiration = Date_t::now(); + + changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) { + // Populate the current time for time based expiration of pre-images. + if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) { + const BSONType bsonType = currentTimeElem.type(); + tassert(5869300, + str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is " + "'date', but found: " + << bsonType, + bsonType == BSONType::Date); + + currentTimeForTimeBasedExpiration = currentTimeElem.Date(); + } + }); + deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration); + } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { + LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted"); + } catch (const DBException& exception) { + LOGV2_ERROR(5869106, + "Periodic expired pre-images removal job failed", + "reason"_attr = exception.reason()); + } } +} // namespace -PeriodicJobAnchor& PeriodicChangeStreamExpiredPreImagesRemover::operator*() const noexcept { - stdx::lock_guard lk(_mutex); - return *_anchor; -} +class ChangeStreamExpiredPreImagesRemover; -PeriodicJobAnchor* PeriodicChangeStreamExpiredPreImagesRemover::operator->() const noexcept { - stdx::lock_guard lk(_mutex); - return _anchor.get(); -} +namespace { +const auto getChangeStreamExpiredPreImagesRemover = + ServiceContext::declareDecoration<std::unique_ptr<ChangeStreamExpiredPreImagesRemover>>(); +} // namespace -void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceContext) { - stdx::lock_guard lk(_mutex); - if (_anchor) { - return; +/** + * A periodic background job that removes expired change stream pre-image documents from the + * 'system.preimages' collection. The period of the job is controlled by the server parameter + * "expiredChangeStreamPreImageRemovalJobSleepSecs". + */ +class ChangeStreamExpiredPreImagesRemover : public BackgroundJob { +public: + explicit ChangeStreamExpiredPreImagesRemover() : BackgroundJob(false /* selfDelete */) {} + + /** + * Retrieves ChangeStreamExpiredPreImagesRemover from the service context 'serviceCtx'. + */ + static ChangeStreamExpiredPreImagesRemover* get(ServiceContext* serviceCtx) { + return getChangeStreamExpiredPreImagesRemover(serviceCtx).get(); } - auto periodicRunner = serviceContext->getPeriodicRunner(); - invariant(periodicRunner); - - PeriodicRunner::PeriodicJob job( - "ChangeStreamExpiredPreImagesRemover", - [](Client* client) { - try { - Date_t currentTimeForTimeBasedExpiration = Date_t::now(); - - changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) { - // Populate the current time for time based expiration of pre-images. - if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) { - const BSONType bsonType = currentTimeElem.type(); - tassert(5869300, - str::stream() - << "Expected type for 'currentTimeForTimeBasedExpiration' is " - "'date', but found: " - << bsonType, - bsonType == BSONType::Date); - - currentTimeForTimeBasedExpiration = currentTimeElem.Date(); - } - }); - - deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration); - } catch (const ExceptionForCat<ErrorCategory::Interruption>&) { - LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted"); - } catch (const DBException& exception) { - LOGV2_ERROR(5869106, - "Periodic expired pre-images removal job failed", - "reason"_attr = exception.reason()); + /** + * Sets ChangeStreamExpiredPreImagesRemover 'preImagesRemover' to the service context + * 'serviceCtx'. + */ + static void set(ServiceContext* serviceCtx, + std::unique_ptr<ChangeStreamExpiredPreImagesRemover> preImagesRemover) { + auto& changeStreamExpiredPreImagesRemover = + getChangeStreamExpiredPreImagesRemover(serviceCtx); + if (changeStreamExpiredPreImagesRemover) { + invariant(!changeStreamExpiredPreImagesRemover->running(), + "Tried to reset the ChangeStreamExpiredPreImagesRemover without shutting " + "down the original instance."); + } + + invariant(preImagesRemover); + changeStreamExpiredPreImagesRemover = std::move(preImagesRemover); + } + + std::string name() const { + return "ChangeStreamExpiredPreImagesRemover"; + } + + void run() { + ThreadClient tc(name(), getGlobalServiceContext()); + AuthorizationSession::get(cc())->grantInternalAuthorization(&cc()); + + { + stdx::lock_guard<Client> lk(*tc.get()); + tc.get()->setSystemOperationKillableByStepdown(lk); + } + + while (true) { + LOGV2_DEBUG(6278517, 3, "Thread awake"); + auto iterationStartTime = Date_t::now(); + performExpiredChangeStreamPreImagesRemovalPass(tc.get()); + { + // Wait until either gExpiredChangeStreamPreImageRemovalJobSleepSecs passes or a + // shutdown is requested. + auto deadline = iterationStartTime + + Seconds(gExpiredChangeStreamPreImageRemovalJobSleepSecs.load()); + stdx::unique_lock<Latch> lk(_stateMutex); + + MONGO_IDLE_THREAD_BLOCK; + _shuttingDownCV.wait_until( + lk, deadline.toSystemTimePoint(), [&] { return _shuttingDown; }); + + if (_shuttingDown) { + return; + } } - }, - Seconds(gExpiredChangeStreamPreImageRemovalJobSleepSecs.load())); + } + } + + /** + * Signals the thread to quit and then waits until it does. + */ + void shutdown() { + LOGV2(6278515, "Shutting down Change Stream Expired Pre-images Remover thread"); + { + stdx::lock_guard<Latch> lk(_stateMutex); + _shuttingDown = true; + } + _shuttingDownCV.notify_one(); + wait(); + LOGV2(6278516, "Finished shutting down Change Stream Expired Pre-images Remover thread"); + } - _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job))); +private: + // Protects the state below. + mutable Mutex _stateMutex = MONGO_MAKE_LATCH("ChangeStreamExpiredPreImagesRemoverStateMutex"); + + // Signaled to wake up the thread, if the thread is waiting. The thread will check whether + // _shuttingDown is set and stop accordingly. + mutable stdx::condition_variable _shuttingDownCV; + + bool _shuttingDown = false; +}; + +void startChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext) { + std::unique_ptr<ChangeStreamExpiredPreImagesRemover> changeStreamExpiredPreImagesRemover = + std::make_unique<ChangeStreamExpiredPreImagesRemover>(); + changeStreamExpiredPreImagesRemover->go(); + ChangeStreamExpiredPreImagesRemover::set(serviceContext, + std::move(changeStreamExpiredPreImagesRemover)); } + +void shutdownChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext) { + ChangeStreamExpiredPreImagesRemover* changeStreamExpiredPreImagesRemover = + ChangeStreamExpiredPreImagesRemover::get(serviceContext); + // We allow the ChangeStreamExpiredPreImagesRemover not to be set in case shutdown occurs before + // the thread has been initialized. + if (changeStreamExpiredPreImagesRemover) { + changeStreamExpiredPreImagesRemover->shutdown(); + } +} + } // namespace mongo diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h index 57f6442328f..89849552b0a 100644 --- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h +++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h @@ -31,12 +31,8 @@ #include "mongo/db/commands/change_stream_options_gen.h" #include "mongo/db/service_context.h" -#include "mongo/platform/mutex.h" -#include "mongo/util/hierarchical_acquisition.h" -#include "mongo/util/periodic_runner.h" namespace mongo { - namespace preImageRemoverInternal { /** @@ -59,28 +55,14 @@ boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_ } // namespace preImageRemoverInternal -class ServiceContext; - /** - * A periodic background job to remove expired documents from 'system.preimages' collection. A - * document in this collection is considered expired if its corresponding oplog entry falls off the - * oplog. + * Starts a periodic background job to remove expired documents from 'system.preimages' collection. */ -class PeriodicChangeStreamExpiredPreImagesRemover final { -public: - static PeriodicChangeStreamExpiredPreImagesRemover& get(ServiceContext* serviceContext); - - PeriodicJobAnchor& operator*() const noexcept; - PeriodicJobAnchor* operator->() const noexcept; - -private: - void _init(ServiceContext* serviceContext); +void startChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext); - inline static const auto _serviceDecoration = - ServiceContext::declareDecoration<PeriodicChangeStreamExpiredPreImagesRemover>(); - - mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), - "PeriodicChangeStreamExpiredPreImagesRemover::_mutex"); - std::shared_ptr<PeriodicJobAnchor> _anchor; -}; +/** + * Stops the periodic background job that removes expired documents from 'system.preimages' + * collection. + */ +void shutdownChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext); } // namespace mongo diff --git a/src/mongo/db/repl/primary_only_service_op_observer.h b/src/mongo/db/repl/primary_only_service_op_observer.h index f434e29e34d..a81b66e04cd 100644 --- a/src/mongo/db/repl/primary_only_service_op_observer.h +++ b/src/mongo/db/repl/primary_only_service_op_observer.h @@ -190,10 +190,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.h b/src/mongo/db/repl/tenant_migration_donor_op_observer.h index 90e3a49c743..6ab805ee967 100644 --- a/src/mongo/db/repl/tenant_migration_donor_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.h @@ -187,10 +187,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h index 76567db5b95..b7b0a88ca96 100644 --- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h +++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h @@ -189,10 +189,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h index 71ba94767a9..776ce97af3d 100644 --- a/src/mongo/db/s/config_server_op_observer.h +++ b/src/mongo/db/s/config_server_op_observer.h @@ -190,10 +190,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept override {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) override {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) override { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h index 88101c36b32..be4dfc18505 100644 --- a/src/mongo/db/s/resharding/resharding_op_observer.h +++ b/src/mongo/db/s/resharding/resharding_op_observer.h @@ -210,10 +210,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept override {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) override {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) override { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h index 8dabf371829..684ef8153bf 100644 --- a/src/mongo/db/s/shard_server_op_observer.h +++ b/src/mongo/db/s/shard_server_op_observer.h @@ -189,10 +189,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept override {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) override {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) override { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) override {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) override {} diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.h b/src/mongo/db/serverless/shard_split_donor_op_observer.h index 1b24dab4140..27b05527cce 100644 --- a/src/mongo/db/serverless/shard_split_donor_op_observer.h +++ b/src/mongo/db/serverless/shard_split_donor_op_observer.h @@ -186,10 +186,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index f154c9695d2..78864240462 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1613,11 +1613,24 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( hangAfterReservingPrepareTimestamp.pauseWhileSet(); } } + auto opObserver = opCtx->getServiceContext()->getOpObserver(); + const auto wallClockTime = opCtx->getServiceContext()->getFastClockSource()->now(); + auto applyOpsOplogSlotAndOperationAssignment = + opObserver->preTransactionPrepare(opCtx, + reservedSlots, + p().numberOfPrePostImagesToWrite, + wallClockTime, + &completedTransactionOperations); + opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.getTimestamp()); opCtx->getWriteUnitOfWork()->prepare(); p().needToWriteAbortEntry = true; - opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare( - opCtx, reservedSlots, &completedTransactionOperations, p().numberOfPrePostImagesToWrite); + opObserver->onTransactionPrepare(opCtx, + reservedSlots, + &completedTransactionOperations, + applyOpsOplogSlotAndOperationAssignment.get(), + p().numberOfPrePostImagesToWrite, + wallClockTime); abortGuard.dismiss(); diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index b2b57970e94..eedc392c560 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -93,13 +93,20 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) override { + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) override { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare( - opCtx, reservedSlots, statements, numberOfPrePostImagesToWrite); + OpObserverNoop::onTransactionPrepare(opCtx, + reservedSlots, + statements, + applyOpsOperationAssignment, + numberOfPrePostImagesToWrite, + Date_t::now()); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 6d2df9f6ec5..60d1a24f408 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -103,10 +103,20 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) override; + std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) override; + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) override; bool onTransactionPrepareThrowsException = false; bool transactionPrepared = false; @@ -148,13 +158,30 @@ public: const repl::OpTime dropOpTime = {Timestamp(Seconds(100), 1U), 1LL}; }; -void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) { +std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment> +OpObserverMock::preTransactionPrepare(OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) { + return std::make_unique<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>( + OpObserver::ApplyOpsOplogSlotAndOperationAssignment{{}, {}}); +} + +void OpObserverMock::onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) { ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare( - opCtx, reservedSlots, statements, numberOfPrePostImagesToWrite); + OpObserverNoop::onTransactionPrepare(opCtx, + reservedSlots, + statements, + applyOpsOperationAssignment, + numberOfPrePostImagesToWrite, + wallClockTime); uassert(ErrorCodes::OperationFailed, "onTransactionPrepare() failed", diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h index d5ad0a2ecaa..adef807ed97 100644 --- a/src/mongo/db/user_write_block_mode_op_observer.h +++ b/src/mongo/db/user_write_block_mode_op_observer.h @@ -197,10 +197,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h index 4519de522a7..932e99f44a3 100644 --- a/src/mongo/idl/cluster_server_parameter_op_observer.h +++ b/src/mongo/idl/cluster_server_parameter_op_observer.h @@ -201,10 +201,22 @@ public: Timestamp commitTimestamp, const std::vector<repl::ReplOperation>& statements) noexcept final {} - void onTransactionPrepare(OperationContext* opCtx, - const std::vector<OplogSlot>& reservedSlots, - std::vector<repl::ReplOperation>* statements, - size_t numberOfPrePostImagesToWrite) final {} + std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime, + std::vector<repl::ReplOperation>* statements) final { + return nullptr; + } + + void onTransactionPrepare( + OperationContext* opCtx, + const std::vector<OplogSlot>& reservedSlots, + std::vector<repl::ReplOperation>* statements, + const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment, + size_t numberOfPrePostImagesToWrite, + Date_t wallClockTime) final {} void onTransactionAbort(OperationContext* opCtx, boost::optional<OplogSlot> abortOplogEntryOpTime) final {} |