summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorMindaugas Malinauskas <mindaugas.malinauskas@mongodb.com>2022-02-16 10:57:21 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-23 11:27:17 +0000
commitf65692d2b92ecaf505dc73f82d5490f9528f5fe2 (patch)
tree7b460734da63f0e44af88440c78ae0980d2cf4c2 /src
parentc3fee8287bf070e1fa6309513a4e6d8100c440ee (diff)
downloadmongo-f65692d2b92ecaf505dc73f82d5490f9528f5fe2.tar.gz
SERVER-62785 Write change stream pre-images in the main storage engine transaction for prepared transactions
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/auth/auth_op_observer.h20
-rw-r--r--src/mongo/db/fcv_op_observer.h21
-rw-r--r--src/mongo/db/free_mon/free_mon_op_observer.h20
-rw-r--r--src/mongo/db/mongod_main.cpp23
-rw-r--r--src/mongo/db/op_observer.h69
-rw-r--r--src/mongo/db/op_observer_impl.cpp315
-rw-r--r--src/mongo/db/op_observer_impl.h19
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp104
-rw-r--r--src/mongo/db/op_observer_noop.h19
-rw-r--r--src/mongo/db/op_observer_registry.h40
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp198
-rw-r--r--src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h32
-rw-r--r--src/mongo/db/repl/primary_only_service_op_observer.h20
-rw-r--r--src/mongo/db/repl/tenant_migration_donor_op_observer.h20
-rw-r--r--src/mongo/db/repl/tenant_migration_recipient_op_observer.h20
-rw-r--r--src/mongo/db/s/config_server_op_observer.h20
-rw-r--r--src/mongo/db/s/resharding/resharding_op_observer.h20
-rw-r--r--src/mongo/db/s/shard_server_op_observer.h20
-rw-r--r--src/mongo/db/serverless/shard_split_donor_op_observer.h20
-rw-r--r--src/mongo/db/transaction_participant.cpp17
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp19
-rw-r--r--src/mongo/db/transaction_participant_test.cpp47
-rw-r--r--src/mongo/db/user_write_block_mode_op_observer.h20
-rw-r--r--src/mongo/idl/cluster_server_parameter_op_observer.h20
24 files changed, 841 insertions, 302 deletions
diff --git a/src/mongo/db/auth/auth_op_observer.h b/src/mongo/db/auth/auth_op_observer.h
index 5bf86445b7a..a2a0183f65f 100644
--- a/src/mongo/db/auth/auth_op_observer.h
+++ b/src/mongo/db/auth/auth_op_observer.h
@@ -188,10 +188,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/fcv_op_observer.h b/src/mongo/db/fcv_op_observer.h
index 8f4bd2a5835..920508eef86 100644
--- a/src/mongo/db/fcv_op_observer.h
+++ b/src/mongo/db/fcv_op_observer.h
@@ -184,10 +184,23 @@ public:
OplogSlot commitOplogEntryOpTime,
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final{};
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final{};
+
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final{};
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final{};
void onMajorityCommitPointUpdate(ServiceContext* service,
diff --git a/src/mongo/db/free_mon/free_mon_op_observer.h b/src/mongo/db/free_mon/free_mon_op_observer.h
index 348633c863b..b266555151c 100644
--- a/src/mongo/db/free_mon/free_mon_op_observer.h
+++ b/src/mongo/db/free_mon/free_mon_op_observer.h
@@ -188,10 +188,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 8dd1fec50fd..02c2ba93904 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -786,15 +786,7 @@ ExitCode _initAndListen(ServiceContext* serviceContext, int listenPort) {
repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() ==
repl::ReplicationCoordinator::modeNone;
if (!isStandalone) {
- try {
- PeriodicChangeStreamExpiredPreImagesRemover::get(serviceContext)->start();
- } catch (ExceptionFor<ErrorCodes::PeriodicJobIsStopped>&) {
- LOGV2_WARNING(5869107, "Not starting periodic jobs as shutdown is in progress");
- // Shutdown has already started before initialization is complete. Wait for the
- // shutdown task to complete and return.
- MONGO_IDLE_THREAD_BLOCK;
- return waitForShutdown();
- }
+ startChangeStreamExpiredPreImagesRemover(serviceContext);
}
// Set up the logical session cache
@@ -1275,16 +1267,6 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
exec->join();
}
- const auto isStandalone =
- repl::ReplicationCoordinator::get(serviceContext)->getReplicationMode() ==
- repl::ReplicationCoordinator::modeNone;
- if (!isStandalone) {
- LOGV2_OPTIONS(5869108,
- {LogComponent::kQuery},
- "Shutting down the ChangeStreamExpiredPreImagesRemover");
- PeriodicChangeStreamExpiredPreImagesRemover::get(serviceContext)->stop();
- }
-
if (auto storageEngine = serviceContext->getStorageEngine()) {
if (storageEngine->supportsReadConcernSnapshot()) {
LOGV2(4784908, "Shutting down the PeriodicThreadToAbortExpiredTransactions");
@@ -1419,6 +1401,9 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
LOGV2(4784928, "Shutting down the TTL monitor");
shutdownTTLMonitor(serviceContext);
+ LOGV2(6278511, "Shutting down the Change Stream Expired Pre-images Remover");
+ shutdownChangeStreamExpiredPreImagesRemover(serviceContext);
+
// We should always be able to acquire the global lock at shutdown.
// An OperationContext is not necessary to call lockGlobal() during shutdown, as it's only used
// to check that lockGlobal() is not called after a transaction timestamp has been set.
diff --git a/src/mongo/db/op_observer.h b/src/mongo/db/op_observer.h
index f1e8031c440..6c725dfd21f 100644
--- a/src/mongo/db/op_observer.h
+++ b/src/mongo/db/op_observer.h
@@ -414,6 +414,57 @@ public:
const std::vector<repl::ReplOperation>& statements) noexcept = 0;
/**
+ * Contains "applyOps" oplog entries and oplog slots to be used for writing pre- and post- image
+ * oplog entries for a transaction. "applyOps" entries are not actual "applyOps" entries to be
+ * written to the oplog, but comprise certain parts of those entries - BSON serialized
+ * operations, and the assigned oplog slot. The operations in field 'ApplyOpsEntry::operations'
+ * should be considered opaque outside the OpObserver.
+ */
+ struct ApplyOpsOplogSlotAndOperationAssignment {
+ struct ApplyOpsEntry {
+ OplogSlot oplogSlot;
+ std::vector<BSONObj> operations;
+ };
+
+ // Oplog slots to be used for writing pre- and post- image oplog entries.
+ std::vector<OplogSlot> prePostImageOplogEntryOplogSlots;
+
+ // Representation of "applyOps" oplog entries.
+ std::vector<ApplyOpsEntry> applyOpsEntries;
+
+ // Number of oplog slots utilized.
+ size_t numberOfOplogSlotsUsed;
+ };
+
+ /**
+ * This method is called before an atomic transaction is prepared. It must be called when a
+ * transaction is active.
+ *
+ * Optionally returns a representation of "applyOps" entries to be written and oplog slots to be
+ * used for writing pre- and post- image oplog entries for a transaction. Only one OpObserver in
+ * the system should return the representation of "applyOps" entries. The returned value is
+ * passed to 'onTransactionPrepare()'.
+ *
+ * The 'reservedSlots' is a list of oplog slots reserved for the oplog entries in a transaction.
+ * The last reserved slot represents the prepareOpTime used for the prepare oplog entry.
+ *
+ * The 'numberOfPrePostImagesToWrite' is the number of CRUD operations that have a pre-image
+ * to write as a noop oplog entry.
+ *
+ * The 'wallClockTime' is the time to record as wall clock time on oplog entries resulting from
+ * transaction preparation.
+ *
+ * The 'statements' are the list of CRUD operations to be applied in this transaction. The
+ * operations may be modified by setting pre-image and post-image oplog entry timestamps.
+ */
+ virtual std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) = 0;
+
+ /**
* The onTransactionPrepare method is called when an atomic transaction is prepared. It must be
* called when a transaction is active.
*
@@ -422,14 +473,24 @@ public:
*
* The 'statements' are the list of CRUD operations to be applied in this transaction.
*
+ * The 'applyOpsOperationAssignment' contains a representation of "applyOps" entries and oplog
+ * slots to be used for writing pre- and post- image oplog entries for a transaction. A value
+ * returned by 'preTransactionPrepare()' should be passed as 'applyOpsOperationAssignment'.
+ *
* The 'numberOfPrePostImagesToWrite' is the number of CRUD operations that have a pre-image
* to write as a noop oplog entry. The op observer will reserve oplog slots for these
* preimages in addition to the statements.
+ *
+ * The 'wallClockTime' is the time to record as wall clock time on oplog entries resulting from
+ * transaction preparation. The same time value should be passed to 'preTransactionPrepare()'.
*/
- virtual void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) = 0;
+ virtual void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) = 0;
/**
* The onTransactionAbort method is called when an atomic transaction aborts, before the
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index dda43be72ee..fac3cd11e1e 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/op_observer_impl.h"
+#include <algorithm>
#include <limits>
#include "mongo/bson/bsonobjbuilder.h"
@@ -102,8 +103,12 @@ Date_t getWallClockTimeForOpLog(OperationContext* opCtx) {
return clockSource->now();
}
-repl::OpTime logOperation(OperationContext* opCtx, MutableOplogEntry* oplogEntry) {
- oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx));
+repl::OpTime logOperation(OperationContext* opCtx,
+ MutableOplogEntry* oplogEntry,
+ bool assignWallClockTime = true) {
+ if (assignWallClockTime) {
+ oplogEntry->setWallClockTime(getWallClockTimeForOpLog(opCtx));
+ }
auto& times = OpObserver::Times::get(opCtx).reservedOpTimes;
auto opTime = repl::logOp(opCtx, oplogEntry);
times.push_back(opTime);
@@ -1320,8 +1325,8 @@ namespace {
*/
void writeChangeStreamPreImagesForApplyOpsEntries(
OperationContext* opCtx,
- const std::vector<repl::ReplOperation>::iterator& stmtBegin,
- const std::vector<repl::ReplOperation>::iterator& stmtEnd,
+ std::vector<repl::ReplOperation>::const_iterator stmtBegin,
+ std::vector<repl::ReplOperation>::const_iterator stmtEnd,
Timestamp applyOpsTimestamp,
Date_t operationTime) {
int64_t applyOpsIndex{0};
@@ -1342,20 +1347,175 @@ void writeChangeStreamPreImagesForApplyOpsEntries(
}
}
+/**
+ * Returns transaction operations that can fit into an "applyOps" entry. The returned operations are
+ * serialized to BSON. The transaction operations are given by range ['operationsBegin',
+ * 'operationsEnd'). Constraints for fitting the operations: (1) the resulting "applyOps" entry
+ * shouldn't exceed the 16MB limit, unless only one operation is allocated to it; (2) the number of
+ * operations is not larger than the maximum number of transaction statements allowed in one entry
+ * as defined by 'gMaxNumberOfTransactionOperationsInSingleOplogEntry'.
+ */
+std::vector<BSONObj> packTransactionOperationsIntoApplyOps(
+ std::vector<repl::ReplOperation>::const_iterator operationsBegin,
+ std::vector<repl::ReplOperation>::const_iterator operationsEnd) {
+ tassert(6278503,
+ "gMaxNumberOfTransactionOperationsInSingleOplogEntry should be positive number",
+ gMaxNumberOfTransactionOperationsInSingleOplogEntry > 0);
+ std::vector<BSONObj> operations;
+ size_t totalOperationsSize{0};
+ for (auto operationIter = operationsBegin; operationIter != operationsEnd; ++operationIter) {
+ const auto& operation = *operationIter;
+
+ // Stop packing when either number of transaction operations is reached, or when the next
+ // one would make the total size of operations larger than the maximum BSON Object User
+ // Size. We rely on the headroom between BSONObjMaxUserSize and BSONObjMaxInternalSize to
+ // cover the BSON overhead and the other "applyOps" entry fields. But if a single operation
+ // in the set exceeds BSONObjMaxUserSize, we still fit it, as a single max-length operation
+ // should be able to be packed into an "applyOps" entry.
+ if (operations.size() ==
+ static_cast<size_t>(gMaxNumberOfTransactionOperationsInSingleOplogEntry) ||
+ (operations.size() > 0 &&
+ (totalOperationsSize + DurableOplogEntry::getDurableReplOperationSize(operation) >
+ BSONObjMaxUserSize))) {
+ break;
+ }
+ auto serializedOperation = operation.toBSON();
+ totalOperationsSize += static_cast<size_t>(serializedOperation.objsize());
+ operations.emplace_back(std::move(serializedOperation));
+ }
+ return operations;
+}
+
+/**
+ * Returns oplog slots to be used for "applyOps" oplog entries, BSON serialized operations, their
+ * assignments to "applyOps" entries, and oplog slots to be used for writing pre- and post- image
+ * oplog entries for the transaction consisting of 'operations'. Allocates oplog slots from
+ * 'oplogSlots'. The 'numberOfPrePostImagesToWrite' is the number of CRUD operations that have a
+ * pre-image to write as a noop oplog entry. The 'prepare' indicates if the function is called when
+ * preparing a transaction.
+ */
+OpObserver::ApplyOpsOplogSlotAndOperationAssignment
+getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& oplogSlots,
+ size_t numberOfPrePostImagesToWrite,
+ bool prepare,
+ std::vector<repl::ReplOperation>& operations) {
+ if (operations.empty()) {
+ return {{}, {}, 0 /*numberOfOplogSlotsUsed*/};
+ }
+ tassert(6278504, "Insufficient number of oplogSlots", operations.size() <= oplogSlots.size());
+
+ std::vector<OplogSlot> prePostImageOplogEntryOplogSlots;
+ std::vector<OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry> applyOpsEntries;
+ const auto operationCount = operations.size();
+ auto oplogSlotIter = oplogSlots.begin();
+ auto getNextOplogSlot = [&]() {
+ tassert(6278505, "Unexpected end of oplog slot vector", oplogSlotIter != oplogSlots.end());
+ return *oplogSlotIter++;
+ };
+
+ auto isMigratingTenant = [&opCtx]() {
+ return static_cast<bool>(repl::tenantMigrationRecipientInfo(opCtx));
+ };
+
+ // We never want to store pre-images or post-images when we're migrating oplog entries from
+ // another replica set.
+ if (numberOfPrePostImagesToWrite > 0 && !isMigratingTenant()) {
+ for (size_t operationIdx = 0; operationIdx < operationCount; ++operationIdx) {
+ auto& statement = operations[operationIdx];
+ if (statement.isChangeStreamPreImageRecordedInOplog() ||
+ (statement.isPreImageRecordedForRetryableInternalTransaction() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPreImage)) {
+ tassert(6278506, "Expected a pre-image", !statement.getPreImage().isEmpty());
+ auto oplogSlot = getNextOplogSlot();
+ prePostImageOplogEntryOplogSlots.push_back(oplogSlot);
+ statement.setPreImageOpTime(oplogSlot);
+ }
+ if (!statement.getPostImage().isEmpty() &&
+ statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) {
+ auto oplogSlot = getNextOplogSlot();
+ prePostImageOplogEntryOplogSlots.push_back(oplogSlot);
+ statement.setPostImageOpTime(oplogSlot);
+ }
+ }
+ }
+
+ auto hasNeedsRetryImage = [](const repl::ReplOperation& operation) {
+ return static_cast<bool>(operation.getNeedsRetryImage());
+ };
+
+ // Assign operations to "applyOps" entries.
+ for (auto operationIt = operations.begin(); operationIt != operations.end();) {
+ auto applyOpsOperations =
+ packTransactionOperationsIntoApplyOps(operationIt, operations.end());
+ const auto opCountWithNeedsRetryImage =
+ std::count_if(operationIt, operationIt + applyOpsOperations.size(), hasNeedsRetryImage);
+ if (opCountWithNeedsRetryImage > 0) {
+ // Reserve a slot for a forged no-op entry.
+ getNextOplogSlot();
+ }
+ operationIt += applyOpsOperations.size();
+ applyOpsEntries.emplace_back(
+ OpObserver::ApplyOpsOplogSlotAndOperationAssignment::ApplyOpsEntry{
+ getNextOplogSlot(), std::move(applyOpsOperations)});
+ }
+
+ // In the special case of writing the implicit 'prepare' oplog entry, we use the last reserved
+ // oplog slot. This may mean we skipped over some reserved slots, but there's no harm in that.
+ if (prepare) {
+ applyOpsEntries.back().oplogSlot = oplogSlots.back();
+ }
+ return {std::move(prePostImageOplogEntryOplogSlots),
+ std::move(applyOpsEntries),
+ static_cast<size_t>(oplogSlotIter - oplogSlots.begin())};
+}
+
+/**
+ * Writes change stream pre-images for transaction 'operations'. The 'applyOpsOperationAssignment'
+ * contains a representation of "applyOps" entries to be written for the transaction. The
+ * 'operationTime' is wall clock time of the operations used for the pre-image documents.
+ */
+void writeChangeStreamPreImagesForTransaction(
+ OperationContext* opCtx,
+ const std::vector<repl::ReplOperation>& operations,
+ const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment,
+ Date_t operationTime) {
+ // This function must be called from an outer WriteUnitOfWork in order to be rolled back upon
+ // reaching the exception.
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ auto applyOpsEntriesIt = applyOpsOperationAssignment.applyOpsEntries.begin();
+ for (auto operationIter = operations.begin(); operationIter != operations.end();) {
+ tassert(6278507,
+ "Unexpected end of applyOps entries vector",
+ applyOpsEntriesIt != applyOpsOperationAssignment.applyOpsEntries.end());
+ const auto& applyOpsEntry = *applyOpsEntriesIt++;
+ const auto operationSequenceEnd = operationIter + applyOpsEntry.operations.size();
+ writeChangeStreamPreImagesForApplyOpsEntries(opCtx,
+ operationIter,
+ operationSequenceEnd,
+ applyOpsEntry.oplogSlot.getTimestamp(),
+ operationTime);
+ operationIter = operationSequenceEnd;
+ }
+}
+
// Accepts an empty BSON builder and appends the given transaction statements to an 'applyOps' array
-// field. Appends as many operations as possible to the array (and their corresponding statement
-// ids to 'stmtIdsWritten') until either the constructed object exceeds the 16MB limit or the
-// maximum number of transaction statements allowed in one entry. If any of the statements has
-// a pre-image or post-image that needs to be stored in the image collection, stores it to
-// 'imageToWrite'.
-//
-// Returns an iterator to the first statement that wasn't packed into the applyOps object.
-std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
+// field (and their corresponding statement ids to 'stmtIdsWritten'). The transaction statements are
+// represented as range ['stmtBegin', 'stmtEnd') and BSON serialized objects 'operations'. If any of
+// the statements has a pre-image or post-image that needs to be stored in the image collection,
+// stores it to 'imageToWrite'.
+void packTransactionStatementsForApplyOps(
BSONObjBuilder* applyOpsBuilder,
std::vector<StmtId>* stmtIdsWritten,
boost::optional<std::pair<repl::RetryImageEnum, BSONObj>>* imageToWrite,
std::vector<repl::ReplOperation>::iterator stmtBegin,
- std::vector<repl::ReplOperation>::iterator stmtEnd) {
+ std::vector<repl::ReplOperation>::iterator stmtEnd,
+ const std::vector<BSONObj>& operations) {
+ tassert(6278508,
+ "Number of operations does not match the number of transaction statements",
+ operations.size() == static_cast<size_t>(stmtEnd - stmtBegin));
auto setImageToWrite = [&](const repl::ReplOperation& stmt) {
uassert(6054001,
str::stream() << NamespaceString::kConfigImagesNamespace
@@ -1381,21 +1541,11 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
};
std::vector<repl::ReplOperation>::iterator stmtIter;
+ auto operationsIter = operations.begin();
BSONArrayBuilder opsArray(applyOpsBuilder->subarrayStart("applyOps"_sd));
for (stmtIter = stmtBegin; stmtIter != stmtEnd; stmtIter++) {
const auto& stmt = *stmtIter;
- // Stop packing when either number of transaction operations is reached, or when the next
- // one would put the array over the maximum BSON Object User Size. We rely on the head room
- // between BSONObjMaxUserSize and BSONObjMaxInternalSize to cover the BSON overhead and the
- // other applyOps fields. But if the array with a single operation exceeds
- // BSONObjMaxUserSize, we still log it, as a single max-length operation should be able to
- // be applied.
- if (opsArray.arrSize() == gMaxNumberOfTransactionOperationsInSingleOplogEntry ||
- (opsArray.arrSize() > 0 &&
- (opsArray.len() + DurableOplogEntry::getDurableReplOperationSize(stmt) >
- BSONObjMaxUserSize)))
- break;
- opsArray.append(stmt.toBSON());
+ opsArray.append(*operationsIter++);
const auto stmtIds = stmt.getStatementIds();
stmtIdsWritten->insert(stmtIdsWritten->end(), stmtIds.begin(), stmtIds.end());
if (stmt.getNeedsRetryImage()) {
@@ -1413,7 +1563,6 @@ std::vector<repl::ReplOperation>::iterator packTransactionStatementsForApplyOps(
e.code() != ErrorCodes::BSONObjectTooLarge);
throw;
}
- return stmtIter;
}
// Logs one applyOps entry and may update the transactions table. Assumes that the given BSON
@@ -1449,7 +1598,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
try {
OpTimeBundle times;
- times.writeOpTime = logOperation(opCtx, oplogEntry);
+ times.writeOpTime = logOperation(opCtx, oplogEntry, false /*assignWallClockTime*/);
times.wallClockTime = oplogEntry->getWallClockTime();
if (updateTxnTable) {
SessionTxnRecord sessionTxnRecord;
@@ -1487,6 +1636,10 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
// number of applyOps written will be used. It also expects that the vector of given statements is
// non-empty.
//
+// The 'applyOpsOperationAssignment' contains BSON serialized transaction statements, their
+// assigment to "applyOps" oplog entries, and oplog slots to be used for writing pre- and post-
+// image oplog entries for a transaction.
+//
// In the case of writing entries for a prepared transaction, the last oplog entry (i.e. the
// implicit prepare) will always be written using the last oplog slot given, even if this means
// skipping over some reserved slots.
@@ -1496,11 +1649,12 @@ int logOplogEntriesForTransaction(
OperationContext* opCtx,
std::vector<repl::ReplOperation>* stmts,
const std::vector<OplogSlot>& oplogSlots,
+ const OpObserver::ApplyOpsOplogSlotAndOperationAssignment& applyOpsOperationAssignment,
boost::optional<ImageBundle>* prePostImageToWriteToImageCollection,
size_t numberOfPrePostImagesToWrite,
- bool prepare) {
+ bool prepare,
+ Date_t wallClockTime) {
invariant(!stmts->empty());
- invariant(stmts->size() <= oplogSlots.size());
// Storage transaction commit is the last place inside a transaction that can throw an
// exception. In order to safely allow exceptions to be thrown at that point, this function must
@@ -1516,15 +1670,17 @@ int logOplogEntriesForTransaction(
invariant(opCtx->lockState()->isWriteLocked());
prevWriteOpTime.writeOpTime = txnParticipant.getLastWriteOpTime();
- auto currOplogSlot = oplogSlots.begin();
+ auto currPrePostImageOplogEntryOplogSlot =
+ applyOpsOperationAssignment.prePostImageOplogEntryOplogSlots.begin();
+
// We never want to store pre-images or post-images when we're migrating oplog entries from
// another replica set.
const auto& migrationRecipientInfo = repl::tenantMigrationRecipientInfo(opCtx);
auto logPrePostImageNoopEntry = [&](const repl::ReplOperation& statement,
const BSONObj& imageDoc) {
- auto slot = *currOplogSlot;
- ++currOplogSlot;
+ auto slot = *currPrePostImageOplogEntryOplogSlot;
+ ++currPrePostImageOplogEntryOplogSlot;
MutableOplogEntry imageEntry;
imageEntry.setSessionId(*opCtx->getLogicalSessionId());
@@ -1537,7 +1693,7 @@ int logOplogEntriesForTransaction(
imageEntry.setOpTime(slot);
imageEntry.setDestinedRecipient(statement.getDestinedRecipient());
- return logOperation(opCtx, &imageEntry);
+ logOperation(opCtx, &imageEntry);
};
if (numberOfPrePostImagesToWrite > 0 && !migrationRecipientInfo) {
@@ -1551,16 +1707,14 @@ int logOplogEntriesForTransaction(
// image collection. Therefore, when 'needsRetryImage' is equal to kPreImage, the
// pre-image will be written to the image collection (after all the applyOps oplog
// entries are written).
- auto opTime = logPrePostImageNoopEntry(statement, statement.getPreImage());
- statement.setPreImageOpTime(opTime);
+ logPrePostImageNoopEntry(statement, statement.getPreImage());
}
if (!statement.getPostImage().isEmpty() &&
statement.getNeedsRetryImage() != repl::RetryImageEnum::kPostImage) {
// Likewise, when 'needsRetryImage' is equal to kPostImage, the post-image will be
// written to the image collection (after all the applyOps oplog entries are
// written).
- auto opTime = logPrePostImageNoopEntry(statement, statement.getPostImage());
- statement.setPostImageOpTime(opTime);
+ logPrePostImageNoopEntry(statement, statement.getPostImage());
}
}
}
@@ -1573,12 +1727,22 @@ int logOplogEntriesForTransaction(
// statements have been packed, it should point to stmts.end(), which is the loop's
// termination condition.
auto stmtsIter = stmts->begin();
+ auto applyOpsIter = applyOpsOperationAssignment.applyOpsEntries.begin();
while (stmtsIter != stmts->end()) {
+ tassert(6278509,
+ "Not enough \"applyOps\" entries",
+ applyOpsIter != applyOpsOperationAssignment.applyOpsEntries.end());
+ auto& applyOpsEntry = *applyOpsIter++;
BSONObjBuilder applyOpsBuilder;
boost::optional<std::pair<repl::RetryImageEnum, BSONObj>> imageToWrite;
- auto nextStmt = packTransactionStatementsForApplyOps(
- &applyOpsBuilder, &stmtIdsWritten, &imageToWrite, stmtsIter, stmts->end());
+ const auto nextStmt = stmtsIter + applyOpsEntry.operations.size();
+ packTransactionStatementsForApplyOps(&applyOpsBuilder,
+ &stmtIdsWritten,
+ &imageToWrite,
+ stmtsIter,
+ nextStmt,
+ applyOpsEntry.operations);
// If we packed the last op, then the next oplog entry we log should be the implicit
// commit or implicit prepare, i.e. we omit the 'partialTxn' field.
@@ -1590,15 +1754,12 @@ int logOplogEntriesForTransaction(
auto isPartialTxn = !lastOp;
if (imageToWrite) {
- // Reserve an oplog slot for potential forged noop oplog entry for the pre-image or
- // post-image.
uassert(6054002,
str::stream() << NamespaceString::kConfigImagesNamespace
<< " can only store the pre or post image of one "
"findAndModify operation for each "
"transaction",
!(*prePostImageToWriteToImageCollection));
- ++currOplogSlot;
}
if (isPartialTxn || (imageToWrite && !prepare)) {
@@ -1629,12 +1790,6 @@ int logOplogEntriesForTransaction(
// the first and last op.
auto updateTxnTable = firstOp || lastOp;
- // Use the next reserved oplog slot. In the special case of writing the implicit
- // 'prepare' oplog entry, we use the last reserved oplog slot, since callers of this
- // function will expect that timestamp to be used as the 'prepare' timestamp. This
- // may mean we skipped over some reserved slots, but there's no harm in that.
- auto oplogSlot = implicitPrepare ? oplogSlots.back() : *currOplogSlot++;
-
// The first optime of the transaction is always the first oplog slot, except in the
// case of a single prepare oplog entry.
auto firstOpTimeOfTxn =
@@ -1646,8 +1801,9 @@ int logOplogEntriesForTransaction(
auto startOpTime = boost::make_optional(!implicitCommit, firstOpTimeOfTxn);
MutableOplogEntry oplogEntry;
- oplogEntry.setOpTime(oplogSlot);
+ oplogEntry.setOpTime(applyOpsEntry.oplogSlot);
oplogEntry.setPrevWriteOpTimeInTransaction(prevWriteOpTime.writeOpTime);
+ oplogEntry.setWallClockTime(wallClockTime);
oplogEntry.setObject(applyOpsBuilder.done());
auto txnState = isPartialTxn
? DurableTxnStateEnum::kInProgress
@@ -1670,15 +1826,11 @@ int logOplogEntriesForTransaction(
prevWriteOpTime.writeOpTime.getTimestamp()};
}
- const auto applyOpsEntryTimestamp = prevWriteOpTime.writeOpTime.getTimestamp();
- writeChangeStreamPreImagesForApplyOpsEntries(
- opCtx, stmtsIter, nextStmt, applyOpsEntryTimestamp, oplogEntry.getWallClockTime());
-
// Advance the iterator to the beginning of the remaining unpacked statements.
stmtsIter = nextStmt;
}
- return currOplogSlot - oplogSlots.begin();
+ return applyOpsOperationAssignment.numberOfOplogSlotsUsed;
}
void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
@@ -1759,10 +1911,27 @@ void OpObserverImpl::onUnpreparedTransactionCommit(OperationContext* opCtx,
uasserted(51268, "hangAndFailUnpreparedCommitAfterReservingOplogSlot fail point enabled");
}
+ // Serialize transaction statements to BSON and determine their assignment to "applyOps"
+ // entries.
+ const auto applyOpsOplogSlotAndOperationAssignment =
+ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ opCtx, oplogSlots, numberOfPrePostImagesToWrite, false /*prepare*/, *statements);
+
+ const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
+ writeChangeStreamPreImagesForTransaction(
+ opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime);
+
// Log in-progress entries for the transaction along with the implicit commit.
boost::optional<ImageBundle> imageToWrite;
- int numOplogEntries = logOplogEntriesForTransaction(
- opCtx, statements, oplogSlots, &imageToWrite, numberOfPrePostImagesToWrite, false);
+ int numOplogEntries = logOplogEntriesForTransaction(opCtx,
+ statements,
+ oplogSlots,
+ applyOpsOplogSlotAndOperationAssignment,
+ &imageToWrite,
+ numberOfPrePostImagesToWrite,
+ false /* prepare*/,
+ wallClockTime);
+
if (imageToWrite) {
writeToImageCollection(opCtx,
*opCtx->getLogicalSessionId(),
@@ -1799,14 +1968,35 @@ void OpObserverImpl::onPreparedTransactionCommit(
logCommitOrAbortForPreparedTransaction(opCtx, &oplogEntry, DurableTxnStateEnum::kCommitted);
}
-void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) {
+std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>
+OpObserverImpl::preTransactionPrepare(OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) {
+ auto applyOpsOplogSlotAndOperationAssignment =
+ getApplyOpsOplogSlotAndOperationAssignmentForTransaction(
+ opCtx, reservedSlots, numberOfPrePostImagesToWrite, true /*prepare*/, *statements);
+ writeChangeStreamPreImagesForTransaction(
+ opCtx, *statements, applyOpsOplogSlotAndOperationAssignment, wallClockTime);
+ return std::make_unique<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>(
+ std::move(applyOpsOplogSlotAndOperationAssignment));
+}
+
+void OpObserverImpl::onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) {
invariant(!reservedSlots.empty());
const auto prepareOpTime = reservedSlots.back();
invariant(opCtx->getTxnNumber());
invariant(!prepareOpTime.isNull());
+ tassert(6278510,
+ "Operation assignments to applyOps entries should be present",
+ applyOpsOperationAssignment);
// Don't write oplog entry on secondaries.
if (!opCtx->writesAreReplicated()) {
@@ -1837,9 +2027,11 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
logOplogEntriesForTransaction(opCtx,
statements,
reservedSlots,
+ *applyOpsOperationAssignment,
&imageToWrite,
numberOfPrePostImagesToWrite,
- true /* prepare */);
+ true /* prepare */,
+ wallClockTime);
if (imageToWrite) {
writeToImageCollection(opCtx,
*opCtx->getLogicalSessionId(),
@@ -1861,6 +2053,7 @@ void OpObserverImpl::onTransactionPrepare(OperationContext* opCtx,
oplogEntry.setOpTime(oplogSlot);
oplogEntry.setPrevWriteOpTimeInTransaction(repl::OpTime());
oplogEntry.setObject(applyOpsBuilder.done());
+ oplogEntry.setWallClockTime(wallClockTime);
logApplyOpsForTransaction(opCtx,
&oplogEntry,
DurableTxnStateEnum::kPrepared,
diff --git a/src/mongo/db/op_observer_impl.h b/src/mongo/db/op_observer_impl.h
index f4177ad608e..7d40fe84307 100644
--- a/src/mongo/db/op_observer_impl.h
+++ b/src/mongo/db/op_observer_impl.h
@@ -192,10 +192,21 @@ public:
OplogSlot commitOplogEntryOpTime,
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final;
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final;
+
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final;
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final;
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final;
void onMajorityCommitPointUpdate(ServiceContext* service,
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index f37e9e7cada..4b2be83993e 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -865,6 +865,22 @@ protected:
return *_txnParticipant;
}
+ void prepareTransaction(const std::vector<OplogSlot>& reservedSlots,
+ repl::OpTime prepareOpTime,
+ size_t numberOfPrePostImagesToWrite = 0) {
+ auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx());
+ auto currentTime = Date_t::now();
+ auto applyOpsAssignment = opObserver().preTransactionPrepare(
+ opCtx(), reservedSlots, numberOfPrePostImagesToWrite, currentTime, &txnOps);
+ opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
+ opObserver().onTransactionPrepare(opCtx(),
+ reservedSlots,
+ &txnOps,
+ applyOpsAssignment.get(),
+ numberOfPrePostImagesToWrite,
+ currentTime);
+ }
+
private:
class ExposeOpObserverTimes : public OpObserver {
public:
@@ -1008,9 +1024,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPrepareTest) {
auto reservedSlots = repl::getNextOpTimes(opCtx(), 5);
auto prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0);
+ prepareTransaction(reservedSlots, prepareOpTime);
}
auto oplogEntryObj = getSingleOplogEntry(opCtx());
@@ -1069,8 +1083,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
const auto prepareSlot = repl::getNextOpTime(opCtx());
txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot);
prepareTimestamp = prepareSlot.getTimestamp();
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), {prepareSlot}, &txnOps, 0);
+ prepareTransaction({prepareSlot}, prepareSlot);
commitSlot = repl::getNextOpTime(opCtx());
}
@@ -1138,8 +1151,7 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) {
const auto prepareSlot = repl::getNextOpTime(opCtx());
txnParticipant.transitionToPreparedforTest(opCtx(), prepareSlot);
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), {prepareSlot}, &txnOps, 0);
+ prepareTransaction({prepareSlot}, prepareSlot);
abortSlot = repl::getNextOpTime(opCtx());
}
@@ -1218,9 +1230,7 @@ TEST_F(OpObserverTransactionTest,
WriteUnitOfWork wuow(opCtx());
prepareOpTime = repl::getNextOpTime(opCtx());
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), {prepareOpTime}, &txnOps, 0);
+ prepareTransaction({prepareOpTime}, prepareOpTime);
}
auto oplogEntryObj = getSingleOplogEntry(opCtx());
@@ -1251,9 +1261,7 @@ TEST_F(OpObserverTransactionTest, PreparingTransactionWritesToTransactionTable)
OplogSlot slot = repl::getNextOpTime(opCtx());
txnParticipant.transitionToPreparedforTest(opCtx(), slot);
prepareOpTime = slot;
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), {slot}, &txnOps, 0);
- opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp());
+ prepareTransaction({slot}, prepareOpTime);
}
ASSERT_EQ(prepareOpTime.getTimestamp(), opCtx()->recoveryUnit()->getPrepareTimestamp());
@@ -1284,9 +1292,7 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction
Lock::GlobalLock lk(opCtx(), MODE_IX);
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
- opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), {slot}, &txnOps, 0);
+ prepareTransaction({slot}, slot);
txnParticipant.transitionToPreparedforTest(opCtx(), slot);
abortSlot = repl::getNextOpTime(opCtx());
}
@@ -1358,9 +1364,7 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
WriteUnitOfWork wuow(opCtx());
OplogSlot slot = repl::getNextOpTime(opCtx());
prepareOpTime = slot;
- opCtx()->recoveryUnit()->setPrepareTimestamp(slot.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), {slot}, &txnOps, 0);
+ prepareTransaction({slot}, slot);
txnParticipant.transitionToPreparedforTest(opCtx(), slot);
}
@@ -1564,7 +1568,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) {
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant.unstashTransactionResources(opCtx(), "insert");
std::vector<InsertStatement> inserts;
- inserts.emplace_back(0, BSON("_id" << 0));
+ inserts.emplace_back(0, BSON("_id" << 0 << "a" << std::string(BSONObjMaxUserSize, 'a')));
WriteUnitOfWork wuow(opCtx());
AutoGetCollection autoColl1(opCtx(), nss, MODE_IX);
@@ -1579,11 +1583,12 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) {
ASSERT_EQ(repl::OpTime(), *oplogEntry.getPrevWriteOpTimeInTransaction());
// The implicit commit oplog entry.
- auto oExpected =
- BSON("applyOps" << BSON_ARRAY(BSON("op"
- << "i"
- << "ns" << nss.toString() << "ui" << uuid << "o"
- << BSON("_id" << 0) << "o2" << BSON("_id" << 0))));
+ auto oExpected = BSON("applyOps" << BSON_ARRAY(BSON(
+ "op"
+ << "i"
+ << "ns" << nss.toString() << "ui" << uuid << "o"
+ << BSON("_id" << 0 << "a" << std::string(BSONObjMaxUserSize, 'a'))
+ << "o2" << BSON("_id" << 0))));
ASSERT_BSONOBJ_EQ(oExpected, oplogEntry.getObject());
}
@@ -1761,14 +1766,14 @@ public:
protected:
void commit() final {
- const auto prepareSlot = repl::getNextOpTime(opCtx());
+ const auto reservedOplogSlots = repl::getNextOpTimes(
+ opCtx(), 1 + txnParticipant().getNumberOfPrePostImagesToWriteForTest());
+ invariant(reservedOplogSlots.size() >= 1);
+ const auto prepareSlot = reservedOplogSlots.back();
txnParticipant().transitionToPreparedforTest(opCtx(), prepareSlot);
- auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(
- opCtx(),
- {prepareSlot},
- &txnOps,
- txnParticipant().getNumberOfPrePostImagesToWriteForTest());
+ prepareTransaction(reservedOplogSlots,
+ prepareSlot,
+ txnParticipant().getNumberOfPrePostImagesToWriteForTest());
};
BSONObj assertGetSingleOplogEntry() final {
@@ -2874,9 +2879,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPreImageTest) {
auto reservedSlots = repl::getNextOpTimes(opCtx(), 4);
prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 2);
+ prepareTransaction(reservedSlots, prepareOpTime, 2);
}
auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
@@ -3001,9 +3004,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertPrepareTest) {
auto reservedSlots = repl::getNextOpTimes(opCtx(), 4);
prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0);
+ prepareTransaction(reservedSlots, prepareOpTime);
}
auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
std::vector<OplogEntry> oplogEntries;
@@ -3093,9 +3094,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdatePrepareTest) {
auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0);
+ prepareTransaction(reservedSlots, prepareOpTime);
auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
std::vector<OplogEntry> oplogEntries;
@@ -3169,9 +3168,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeletePrepareTest) {
prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
prepareOpTime = reservedSlots.back();
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0);
+ prepareTransaction(reservedSlots, prepareOpTime);
}
auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
@@ -3232,10 +3229,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
-
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0);
+ prepareTransaction(reservedSlots, prepareOpTime);
}
auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
@@ -3315,10 +3309,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, AbortPreparedTest) {
auto reservedSlots = repl::getNextOpTimes(opCtx(), 1);
prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
-
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0);
+ prepareTransaction(reservedSlots, prepareOpTime);
}
auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
@@ -3448,9 +3439,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, PreparedTransactionPackingTest) {
auto reservedSlots = repl::getNextOpTimes(opCtx(), 4);
prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0);
+ prepareTransaction(reservedSlots, prepareOpTime);
auto oplogEntryObj = getSingleOplogEntry(opCtx());
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -3505,10 +3494,7 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedPackingTest) {
auto reservedSlots = repl::getNextOpTimes(opCtx(), 2);
prepareOpTime = reservedSlots.back();
txnParticipant.transitionToPreparedforTest(opCtx(), prepareOpTime);
-
- opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
- auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onTransactionPrepare(opCtx(), reservedSlots, &txnOps, 0);
+ prepareTransaction(reservedSlots, prepareOpTime);
auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
diff --git a/src/mongo/db/op_observer_noop.h b/src/mongo/db/op_observer_noop.h
index f458d891846..ab610264ce1 100644
--- a/src/mongo/db/op_observer_noop.h
+++ b/src/mongo/db/op_observer_noop.h
@@ -166,10 +166,21 @@ public:
OplogSlot commitOplogEntryOpTime,
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept override{};
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) override{};
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) override {
+ return nullptr;
+ }
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) override{};
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override{};
void onMajorityCommitPointUpdate(ServiceContext* service,
diff --git a/src/mongo/db/op_observer_registry.h b/src/mongo/db/op_observer_registry.h
index c5f68467066..5186d6a3f47 100644
--- a/src/mongo/db/op_observer_registry.h
+++ b/src/mongo/db/op_observer_registry.h
@@ -384,14 +384,42 @@ public:
opCtx, commitOplogEntryOpTime, commitTimestamp, statements);
}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) override {
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) override {
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment>
+ applyOpsOplogSlotAndOperationAssignment;
+ for (auto&& observer : _observers) {
+ auto applyOpsAssignment = observer->preTransactionPrepare(
+ opCtx, reservedSlots, numberOfPrePostImagesToWrite, wallClockTime, statements);
+ tassert(6278501,
+ "More than one OpObserver returned operation to \"applyOps\" assignment",
+ !(applyOpsAssignment && applyOpsOplogSlotAndOperationAssignment));
+ if (applyOpsAssignment) {
+ applyOpsOplogSlotAndOperationAssignment = std::move(applyOpsAssignment);
+ }
+ }
+ return applyOpsOplogSlotAndOperationAssignment;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) override {
ReservedTimes times{opCtx};
for (auto& observer : _observers) {
- observer->onTransactionPrepare(
- opCtx, reservedSlots, statements, numberOfPrePostImagesToWrite);
+ observer->onTransactionPrepare(opCtx,
+ reservedSlots,
+ statements,
+ applyOpsOperationAssignment,
+ numberOfPrePostImagesToWrite,
+ wallClockTime);
}
}
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
index 9027c8d52de..91f99d83668 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.cpp
@@ -33,18 +33,22 @@
#include "change_stream_expired_pre_image_remover.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/catalog/collection.h"
#include "mongo/db/change_stream_options_manager.h"
#include "mongo/db/client.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
+#include "mongo/db/namespace_string.h"
#include "mongo/db/pipeline/change_stream_preimage_gen.h"
#include "mongo/db/query/internal_plans.h"
#include "mongo/db/record_id_helpers.h"
-#include "mongo/db/repl/oplog_entry_gen.h"
+#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/repl/storage_interface.h"
#include "mongo/db/service_context.h"
#include "mongo/logv2/log.h"
-#include "mongo/s/cluster_commands_helpers.h"
+#include "mongo/util/background.h"
+#include "mongo/util/concurrency/idle_thread_block.h"
#include "mongo/util/fail_point.h"
namespace mongo {
@@ -374,66 +378,152 @@ void deleteExpiredChangeStreamPreImages(Client* client, Date_t currentTimeForTim
"numberOfRemovals"_attr = numberOfRemovals,
"jobDuration"_attr = (Date_t::now() - startTime).toString());
}
-} // namespace
-PeriodicChangeStreamExpiredPreImagesRemover& PeriodicChangeStreamExpiredPreImagesRemover::get(
- ServiceContext* serviceContext) {
- auto& jobContainer = _serviceDecoration(serviceContext);
- jobContainer._init(serviceContext);
- return jobContainer;
+void performExpiredChangeStreamPreImagesRemovalPass(Client* client) {
+ try {
+ Date_t currentTimeForTimeBasedExpiration = Date_t::now();
+
+ changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
+ // Populate the current time for time based expiration of pre-images.
+ if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
+ const BSONType bsonType = currentTimeElem.type();
+ tassert(5869300,
+ str::stream() << "Expected type for 'currentTimeForTimeBasedExpiration' is "
+ "'date', but found: "
+ << bsonType,
+ bsonType == BSONType::Date);
+
+ currentTimeForTimeBasedExpiration = currentTimeElem.Date();
+ }
+ });
+ deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration);
+ } catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
+ LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted");
+ } catch (const DBException& exception) {
+ LOGV2_ERROR(5869106,
+ "Periodic expired pre-images removal job failed",
+ "reason"_attr = exception.reason());
+ }
}
+} // namespace
-PeriodicJobAnchor& PeriodicChangeStreamExpiredPreImagesRemover::operator*() const noexcept {
- stdx::lock_guard lk(_mutex);
- return *_anchor;
-}
+class ChangeStreamExpiredPreImagesRemover;
-PeriodicJobAnchor* PeriodicChangeStreamExpiredPreImagesRemover::operator->() const noexcept {
- stdx::lock_guard lk(_mutex);
- return _anchor.get();
-}
+namespace {
+const auto getChangeStreamExpiredPreImagesRemover =
+ ServiceContext::declareDecoration<std::unique_ptr<ChangeStreamExpiredPreImagesRemover>>();
+} // namespace
-void PeriodicChangeStreamExpiredPreImagesRemover::_init(ServiceContext* serviceContext) {
- stdx::lock_guard lk(_mutex);
- if (_anchor) {
- return;
+/**
+ * A periodic background job that removes expired change stream pre-image documents from the
+ * 'system.preimages' collection. The period of the job is controlled by the server parameter
+ * "expiredChangeStreamPreImageRemovalJobSleepSecs".
+ */
+class ChangeStreamExpiredPreImagesRemover : public BackgroundJob {
+public:
+ explicit ChangeStreamExpiredPreImagesRemover() : BackgroundJob(false /* selfDelete */) {}
+
+ /**
+ * Retrieves ChangeStreamExpiredPreImagesRemover from the service context 'serviceCtx'.
+ */
+ static ChangeStreamExpiredPreImagesRemover* get(ServiceContext* serviceCtx) {
+ return getChangeStreamExpiredPreImagesRemover(serviceCtx).get();
}
- auto periodicRunner = serviceContext->getPeriodicRunner();
- invariant(periodicRunner);
-
- PeriodicRunner::PeriodicJob job(
- "ChangeStreamExpiredPreImagesRemover",
- [](Client* client) {
- try {
- Date_t currentTimeForTimeBasedExpiration = Date_t::now();
-
- changeStreamPreImageRemoverCurrentTime.execute([&](const BSONObj& data) {
- // Populate the current time for time based expiration of pre-images.
- if (auto currentTimeElem = data["currentTimeForTimeBasedExpiration"]) {
- const BSONType bsonType = currentTimeElem.type();
- tassert(5869300,
- str::stream()
- << "Expected type for 'currentTimeForTimeBasedExpiration' is "
- "'date', but found: "
- << bsonType,
- bsonType == BSONType::Date);
-
- currentTimeForTimeBasedExpiration = currentTimeElem.Date();
- }
- });
-
- deleteExpiredChangeStreamPreImages(client, currentTimeForTimeBasedExpiration);
- } catch (const ExceptionForCat<ErrorCategory::Interruption>&) {
- LOGV2_WARNING(5869105, "Periodic expired pre-images removal job was interrupted");
- } catch (const DBException& exception) {
- LOGV2_ERROR(5869106,
- "Periodic expired pre-images removal job failed",
- "reason"_attr = exception.reason());
+ /**
+ * Sets ChangeStreamExpiredPreImagesRemover 'preImagesRemover' to the service context
+ * 'serviceCtx'.
+ */
+ static void set(ServiceContext* serviceCtx,
+ std::unique_ptr<ChangeStreamExpiredPreImagesRemover> preImagesRemover) {
+ auto& changeStreamExpiredPreImagesRemover =
+ getChangeStreamExpiredPreImagesRemover(serviceCtx);
+ if (changeStreamExpiredPreImagesRemover) {
+ invariant(!changeStreamExpiredPreImagesRemover->running(),
+ "Tried to reset the ChangeStreamExpiredPreImagesRemover without shutting "
+ "down the original instance.");
+ }
+
+ invariant(preImagesRemover);
+ changeStreamExpiredPreImagesRemover = std::move(preImagesRemover);
+ }
+
+ std::string name() const {
+ return "ChangeStreamExpiredPreImagesRemover";
+ }
+
+ void run() {
+ ThreadClient tc(name(), getGlobalServiceContext());
+ AuthorizationSession::get(cc())->grantInternalAuthorization(&cc());
+
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc.get()->setSystemOperationKillableByStepdown(lk);
+ }
+
+ while (true) {
+ LOGV2_DEBUG(6278517, 3, "Thread awake");
+ auto iterationStartTime = Date_t::now();
+ performExpiredChangeStreamPreImagesRemovalPass(tc.get());
+ {
+ // Wait until either gExpiredChangeStreamPreImageRemovalJobSleepSecs passes or a
+ // shutdown is requested.
+ auto deadline = iterationStartTime +
+ Seconds(gExpiredChangeStreamPreImageRemovalJobSleepSecs.load());
+ stdx::unique_lock<Latch> lk(_stateMutex);
+
+ MONGO_IDLE_THREAD_BLOCK;
+ _shuttingDownCV.wait_until(
+ lk, deadline.toSystemTimePoint(), [&] { return _shuttingDown; });
+
+ if (_shuttingDown) {
+ return;
+ }
}
- },
- Seconds(gExpiredChangeStreamPreImageRemovalJobSleepSecs.load()));
+ }
+ }
+
+ /**
+ * Signals the thread to quit and then waits until it does.
+ */
+ void shutdown() {
+ LOGV2(6278515, "Shutting down Change Stream Expired Pre-images Remover thread");
+ {
+ stdx::lock_guard<Latch> lk(_stateMutex);
+ _shuttingDown = true;
+ }
+ _shuttingDownCV.notify_one();
+ wait();
+ LOGV2(6278516, "Finished shutting down Change Stream Expired Pre-images Remover thread");
+ }
- _anchor = std::make_shared<PeriodicJobAnchor>(periodicRunner->makeJob(std::move(job)));
+private:
+ // Protects the state below.
+ mutable Mutex _stateMutex = MONGO_MAKE_LATCH("ChangeStreamExpiredPreImagesRemoverStateMutex");
+
+ // Signaled to wake up the thread, if the thread is waiting. The thread will check whether
+ // _shuttingDown is set and stop accordingly.
+ mutable stdx::condition_variable _shuttingDownCV;
+
+ bool _shuttingDown = false;
+};
+
+void startChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext) {
+ std::unique_ptr<ChangeStreamExpiredPreImagesRemover> changeStreamExpiredPreImagesRemover =
+ std::make_unique<ChangeStreamExpiredPreImagesRemover>();
+ changeStreamExpiredPreImagesRemover->go();
+ ChangeStreamExpiredPreImagesRemover::set(serviceContext,
+ std::move(changeStreamExpiredPreImagesRemover));
}
+
+void shutdownChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext) {
+ ChangeStreamExpiredPreImagesRemover* changeStreamExpiredPreImagesRemover =
+ ChangeStreamExpiredPreImagesRemover::get(serviceContext);
+ // We allow the ChangeStreamExpiredPreImagesRemover not to be set in case shutdown occurs before
+ // the thread has been initialized.
+ if (changeStreamExpiredPreImagesRemover) {
+ changeStreamExpiredPreImagesRemover->shutdown();
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
index 57f6442328f..89849552b0a 100644
--- a/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
+++ b/src/mongo/db/pipeline/change_stream_expired_pre_image_remover.h
@@ -31,12 +31,8 @@
#include "mongo/db/commands/change_stream_options_gen.h"
#include "mongo/db/service_context.h"
-#include "mongo/platform/mutex.h"
-#include "mongo/util/hierarchical_acquisition.h"
-#include "mongo/util/periodic_runner.h"
namespace mongo {
-
namespace preImageRemoverInternal {
/**
@@ -59,28 +55,14 @@ boost::optional<Date_t> getPreImageExpirationTime(OperationContext* opCtx, Date_
} // namespace preImageRemoverInternal
-class ServiceContext;
-
/**
- * A periodic background job to remove expired documents from 'system.preimages' collection. A
- * document in this collection is considered expired if its corresponding oplog entry falls off the
- * oplog.
+ * Starts a periodic background job to remove expired documents from 'system.preimages' collection.
*/
-class PeriodicChangeStreamExpiredPreImagesRemover final {
-public:
- static PeriodicChangeStreamExpiredPreImagesRemover& get(ServiceContext* serviceContext);
-
- PeriodicJobAnchor& operator*() const noexcept;
- PeriodicJobAnchor* operator->() const noexcept;
-
-private:
- void _init(ServiceContext* serviceContext);
+void startChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext);
- inline static const auto _serviceDecoration =
- ServiceContext::declareDecoration<PeriodicChangeStreamExpiredPreImagesRemover>();
-
- mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1),
- "PeriodicChangeStreamExpiredPreImagesRemover::_mutex");
- std::shared_ptr<PeriodicJobAnchor> _anchor;
-};
+/**
+ * Stops the periodic background job that removes expired documents from 'system.preimages'
+ * collection.
+ */
+void shutdownChangeStreamExpiredPreImagesRemover(ServiceContext* serviceContext);
} // namespace mongo
diff --git a/src/mongo/db/repl/primary_only_service_op_observer.h b/src/mongo/db/repl/primary_only_service_op_observer.h
index f434e29e34d..a81b66e04cd 100644
--- a/src/mongo/db/repl/primary_only_service_op_observer.h
+++ b/src/mongo/db/repl/primary_only_service_op_observer.h
@@ -190,10 +190,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/repl/tenant_migration_donor_op_observer.h b/src/mongo/db/repl/tenant_migration_donor_op_observer.h
index 90e3a49c743..6ab805ee967 100644
--- a/src/mongo/db/repl/tenant_migration_donor_op_observer.h
+++ b/src/mongo/db/repl/tenant_migration_donor_op_observer.h
@@ -187,10 +187,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
index 76567db5b95..b7b0a88ca96 100644
--- a/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
+++ b/src/mongo/db/repl/tenant_migration_recipient_op_observer.h
@@ -189,10 +189,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/s/config_server_op_observer.h b/src/mongo/db/s/config_server_op_observer.h
index 71ba94767a9..776ce97af3d 100644
--- a/src/mongo/db/s/config_server_op_observer.h
+++ b/src/mongo/db/s/config_server_op_observer.h
@@ -190,10 +190,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept override {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) override {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) override {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/s/resharding/resharding_op_observer.h b/src/mongo/db/s/resharding/resharding_op_observer.h
index 88101c36b32..be4dfc18505 100644
--- a/src/mongo/db/s/resharding/resharding_op_observer.h
+++ b/src/mongo/db/s/resharding/resharding_op_observer.h
@@ -210,10 +210,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept override {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) override {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) override {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/s/shard_server_op_observer.h b/src/mongo/db/s/shard_server_op_observer.h
index 8dabf371829..684ef8153bf 100644
--- a/src/mongo/db/s/shard_server_op_observer.h
+++ b/src/mongo/db/s/shard_server_op_observer.h
@@ -189,10 +189,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept override {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) override {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) override {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) override {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) override {}
diff --git a/src/mongo/db/serverless/shard_split_donor_op_observer.h b/src/mongo/db/serverless/shard_split_donor_op_observer.h
index 1b24dab4140..27b05527cce 100644
--- a/src/mongo/db/serverless/shard_split_donor_op_observer.h
+++ b/src/mongo/db/serverless/shard_split_donor_op_observer.h
@@ -186,10 +186,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index f154c9695d2..78864240462 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -1613,11 +1613,24 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
hangAfterReservingPrepareTimestamp.pauseWhileSet();
}
}
+ auto opObserver = opCtx->getServiceContext()->getOpObserver();
+ const auto wallClockTime = opCtx->getServiceContext()->getFastClockSource()->now();
+ auto applyOpsOplogSlotAndOperationAssignment =
+ opObserver->preTransactionPrepare(opCtx,
+ reservedSlots,
+ p().numberOfPrePostImagesToWrite,
+ wallClockTime,
+ &completedTransactionOperations);
+
opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.getTimestamp());
opCtx->getWriteUnitOfWork()->prepare();
p().needToWriteAbortEntry = true;
- opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(
- opCtx, reservedSlots, &completedTransactionOperations, p().numberOfPrePostImagesToWrite);
+ opObserver->onTransactionPrepare(opCtx,
+ reservedSlots,
+ &completedTransactionOperations,
+ applyOpsOplogSlotAndOperationAssignment.get(),
+ p().numberOfPrePostImagesToWrite,
+ wallClockTime);
abortGuard.dismiss();
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index b2b57970e94..eedc392c560 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -93,13 +93,20 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) override {
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) override {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(
- opCtx, reservedSlots, statements, numberOfPrePostImagesToWrite);
+ OpObserverNoop::onTransactionPrepare(opCtx,
+ reservedSlots,
+ statements,
+ applyOpsOperationAssignment,
+ numberOfPrePostImagesToWrite,
+ Date_t::now());
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 6d2df9f6ec5..60d1a24f408 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -103,10 +103,20 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) override;
+ std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) override;
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) override;
bool onTransactionPrepareThrowsException = false;
bool transactionPrepared = false;
@@ -148,13 +158,30 @@ public:
const repl::OpTime dropOpTime = {Timestamp(Seconds(100), 1U), 1LL};
};
-void OpObserverMock::onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) {
+std::unique_ptr<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>
+OpObserverMock::preTransactionPrepare(OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) {
+ return std::make_unique<OpObserver::ApplyOpsOplogSlotAndOperationAssignment>(
+ OpObserver::ApplyOpsOplogSlotAndOperationAssignment{{}, {}});
+}
+
+void OpObserverMock::onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(
- opCtx, reservedSlots, statements, numberOfPrePostImagesToWrite);
+ OpObserverNoop::onTransactionPrepare(opCtx,
+ reservedSlots,
+ statements,
+ applyOpsOperationAssignment,
+ numberOfPrePostImagesToWrite,
+ wallClockTime);
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
diff --git a/src/mongo/db/user_write_block_mode_op_observer.h b/src/mongo/db/user_write_block_mode_op_observer.h
index d5ad0a2ecaa..adef807ed97 100644
--- a/src/mongo/db/user_write_block_mode_op_observer.h
+++ b/src/mongo/db/user_write_block_mode_op_observer.h
@@ -197,10 +197,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}
diff --git a/src/mongo/idl/cluster_server_parameter_op_observer.h b/src/mongo/idl/cluster_server_parameter_op_observer.h
index 4519de522a7..932e99f44a3 100644
--- a/src/mongo/idl/cluster_server_parameter_op_observer.h
+++ b/src/mongo/idl/cluster_server_parameter_op_observer.h
@@ -201,10 +201,22 @@ public:
Timestamp commitTimestamp,
const std::vector<repl::ReplOperation>& statements) noexcept final {}
- void onTransactionPrepare(OperationContext* opCtx,
- const std::vector<OplogSlot>& reservedSlots,
- std::vector<repl::ReplOperation>* statements,
- size_t numberOfPrePostImagesToWrite) final {}
+ std::unique_ptr<ApplyOpsOplogSlotAndOperationAssignment> preTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime,
+ std::vector<repl::ReplOperation>* statements) final {
+ return nullptr;
+ }
+
+ void onTransactionPrepare(
+ OperationContext* opCtx,
+ const std::vector<OplogSlot>& reservedSlots,
+ std::vector<repl::ReplOperation>* statements,
+ const ApplyOpsOplogSlotAndOperationAssignment* applyOpsOperationAssignment,
+ size_t numberOfPrePostImagesToWrite,
+ Date_t wallClockTime) final {}
void onTransactionAbort(OperationContext* opCtx,
boost::optional<OplogSlot> abortOplogEntryOpTime) final {}