summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2022-09-15 18:35:00 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-15 23:37:30 +0000
commitf4b9d3b6d94765c1bd404ef42cf8fc130b7d9e31 (patch)
tree79b91dedd95acd161fe78f24d865c4490fa42f29
parent6045fe2ee565f3eac4cccea8f5ff5aeff6e6f156 (diff)
downloadmongo-f4b9d3b6d94765c1bd404ef42cf8fc130b7d9e31.tar.gz
SERVER-68860 integrate TransactionOperations into TransactionParticipant
-rw-r--r--src/mongo/db/transaction/transaction_participant.cpp69
-rw-r--r--src/mongo/db/transaction/transaction_participant.h18
2 files changed, 23 insertions, 64 deletions
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp
index 1a811e6f68b..73dd77aac99 100644
--- a/src/mongo/db/transaction/transaction_participant.cpp
+++ b/src/mongo/db/transaction/transaction_participant.cpp
@@ -903,7 +903,7 @@ void TransactionParticipant::Participant::_beginMultiDocumentTransaction(
tickSource,
now,
*o().transactionExpireDate);
- invariant(p().transactionOperations.empty());
+ invariant(p().transactionOperations.isEmpty());
}
}
@@ -1645,7 +1645,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
// Even if the prepared transaction contained no statements, we always reserve at least
// 1 oplog slot for the prepare oplog entry.
auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx)->size();
- numSlotsToReserve += p().numberOfPrePostImagesToWrite;
+ numSlotsToReserve += p().transactionOperations.getNumberOfPrePostImagesToWrite();
oplogSlotReserver.emplace(opCtx, std::max(1, static_cast<int>(numSlotsToReserve)));
invariant(oplogSlotReserver->getSlots().size() >= 1);
prepareOplogSlot = oplogSlotReserver->getLastSlot();
@@ -1671,12 +1671,12 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
}
auto opObserver = opCtx->getServiceContext()->getOpObserver();
const auto wallClockTime = opCtx->getServiceContext()->getFastClockSource()->now();
- auto applyOpsOplogSlotAndOperationAssignment =
- opObserver->preTransactionPrepare(opCtx,
- reservedSlots,
- p().numberOfPrePostImagesToWrite,
- wallClockTime,
- completedTransactionOperations);
+ auto applyOpsOplogSlotAndOperationAssignment = opObserver->preTransactionPrepare(
+ opCtx,
+ reservedSlots,
+ p().transactionOperations.getNumberOfPrePostImagesToWrite(),
+ wallClockTime,
+ completedTransactionOperations);
opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.getTimestamp());
opCtx->getWriteUnitOfWork()->prepare();
@@ -1685,7 +1685,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
reservedSlots,
completedTransactionOperations,
applyOpsOplogSlotAndOperationAssignment.get(),
- p().numberOfPrePostImagesToWrite,
+ p().transactionOperations.getNumberOfPrePostImagesToWrite(),
wallClockTime);
abortGuard.dismiss();
@@ -1746,34 +1746,8 @@ void TransactionParticipant::Participant::addTransactionOperation(
o().activeTxnNumberAndRetryCounter.getTxnNumber() != kUninitializedTxnNumber);
invariant(opCtx->lockState()->inAWriteUnitOfWork());
- const auto stmtIds = operation.getStatementIds();
- for (auto stmtId : stmtIds) {
- auto [_, inserted] = p().transactionStmtIds.insert(stmtId);
- uassert(5875600,
- str::stream() << "Found two operations using the same stmtId of " << stmtId,
- inserted);
- }
- p().transactionOperations.push_back(operation);
-
- p().transactionOperationBytes +=
- repl::DurableOplogEntry::getDurableReplOperationSize(operation);
- if (!operation.getPreImage().isEmpty()) {
- p().transactionOperationBytes += operation.getPreImage().objsize();
- if (operation.isPreImageRecordedForRetryableInternalTransaction()) {
- ++p().numberOfPrePostImagesToWrite;
- }
- }
- if (!operation.getPostImage().isEmpty()) {
- p().transactionOperationBytes += operation.getPostImage().objsize();
- ++p().numberOfPrePostImagesToWrite;
- }
-
- auto transactionSizeLimitBytes = gTransactionSizeLimitBytes.load();
- uassert(ErrorCodes::TransactionTooLarge,
- str::stream() << "Total size of all transaction operations must be less than "
- << "server parameter 'transactionSizeLimitBytes' = "
- << transactionSizeLimitBytes,
- p().transactionOperationBytes <= static_cast<size_t>(transactionSizeLimitBytes));
+ auto transactionSizeLimitBytes = static_cast<std::size_t>(gTransactionSizeLimitBytes.load());
+ uassertStatusOK(p().transactionOperations.addOperation(operation, transactionSizeLimitBytes));
}
std::vector<repl::ReplOperation>*
@@ -1785,14 +1759,14 @@ TransactionParticipant::Participant::retrieveCompletedTransactionOperations(
invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared),
str::stream() << "Current state: " << o().txnState);
- return &(p().transactionOperations);
+ return p().transactionOperations.getMutableOperationsForTransactionParticipant();
}
TxnResponseMetadata TransactionParticipant::Participant::getResponseMetadata() {
// Currently the response metadata only contains a single field, which is whether or not the
// transaction is read-only so far.
return {o().txnState.isInSet(TransactionState::kInProgress) &&
- p().transactionOperations.empty()};
+ p().transactionOperations.isEmpty()};
}
void TransactionParticipant::Participant::clearOperationsInMemory(OperationContext* opCtx) {
@@ -1800,10 +1774,7 @@ void TransactionParticipant::Participant::clearOperationsInMemory(OperationConte
invariant(o().txnState.isInSet(TransactionState::kPrepared | TransactionState::kInProgress),
str::stream() << "Current state: " << o().txnState);
invariant(p().autoCommit);
- p().transactionOperationBytes = 0;
p().transactionOperations.clear();
- p().transactionStmtIds.clear();
- p().numberOfPrePostImagesToWrite = 0;
}
void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationContext* opCtx) {
@@ -1815,7 +1786,8 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
- opObserver->onUnpreparedTransactionCommit(opCtx, txnOps, p().numberOfPrePostImagesToWrite);
+ opObserver->onUnpreparedTransactionCommit(
+ opCtx, txnOps, p().transactionOperations.getNumberOfPrePostImagesToWrite());
// Read-only transactions with all read concerns must wait for any data they read to be majority
// committed. For local read concern this is to match majority read concern. For both local and
@@ -1827,8 +1799,8 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC
auto wc = opCtx->getWriteConcern();
auto needsNoopWrite = txnOps->empty() && !opCtx->getWriteConcern().usedDefaultConstructedWC;
- const size_t operationCount = p().transactionOperations.size();
- const size_t oplogOperationBytes = p().transactionOperationBytes;
+ auto operationCount = p().transactionOperations.numOperations();
+ auto oplogOperationBytes = p().transactionOperations.getTotalOperationBytes();
clearOperationsInMemory(opCtx);
// _commitStorageTransaction can throw, but it is safe for the exception to be bubbled up to
@@ -1956,8 +1928,8 @@ void TransactionParticipant::Participant::commitPreparedTransaction(
commitTimestamp,
*retrieveCompletedTransactionOperations(opCtx));
- const size_t operationCount = p().transactionOperations.size();
- const size_t oplogOperationBytes = p().transactionOperationBytes;
+ auto operationCount = p().transactionOperations.numOperations();
+ auto oplogOperationBytes = p().transactionOperations.getTotalOperationBytes();
clearOperationsInMemory(opCtx);
_finishCommitTransaction(opCtx, operationCount, oplogOperationBytes);
@@ -3127,10 +3099,7 @@ void TransactionParticipant::Participant::_resetTransactionStateAndUnlock(
o(*lk).txnState.transitionTo(state);
}
- p().transactionOperationBytes = 0;
p().transactionOperations.clear();
- p().transactionStmtIds.clear();
- p().numberOfPrePostImagesToWrite = 0;
o(*lk).prepareOpTime = repl::OpTime();
o(*lk).recoveryPrepareOpTime = repl::OpTime();
p().autoCommit = boost::none;
diff --git a/src/mongo/db/transaction/transaction_participant.h b/src/mongo/db/transaction/transaction_participant.h
index ad7d1cb14ff..6b500f5dfcf 100644
--- a/src/mongo/db/transaction/transaction_participant.h
+++ b/src/mongo/db/transaction/transaction_participant.h
@@ -51,6 +51,7 @@
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/storage/storage_engine.h"
#include "mongo/db/transaction/transaction_metrics_observer.h"
+#include "mongo/db/transaction/transaction_operations.h"
#include "mongo/idl/mutable_observer_registry.h"
#include "mongo/logv2/attribute_storage.h"
#include "mongo/stdx/unordered_map.h"
@@ -754,11 +755,11 @@ public:
}
std::vector<repl::ReplOperation> getTransactionOperationsForTest() const {
- return p().transactionOperations;
+ return p().transactionOperations.getOperationsForTest();
}
size_t getNumberOfPrePostImagesToWriteForTest() const {
- return p().numberOfPrePostImagesToWrite;
+ return p().transactionOperations.getNumberOfPrePostImagesToWrite();
}
const Locker* getTxnResourceStashLockerForTest() const {
@@ -1153,18 +1154,7 @@ private:
// Holds oplog data for operations which have been applied in the current multi-document
// transaction.
- std::vector<repl::ReplOperation> transactionOperations;
-
- // Holds stmtIds for operations which have been applied in the current multi-document
- // transaction.
- stdx::unordered_set<StmtId> transactionStmtIds;
-
- // Total size in bytes of all operations within the _transactionOperations vector.
- size_t transactionOperationBytes{0};
-
- // Number of operations that have pre-images or post-images to be written to noop oplog
- // entries or the image collection.
- size_t numberOfPrePostImagesToWrite{0};
+ TransactionOperations transactionOperations;
// The autocommit setting of this transaction. Should always be false for multi-statement
// transaction. Currently only needed for diagnostics reporting.