diff options
author | Benety Goh <benety@mongodb.com> | 2022-09-15 18:35:00 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-15 23:37:30 +0000 |
commit | f4b9d3b6d94765c1bd404ef42cf8fc130b7d9e31 (patch) | |
tree | 79b91dedd95acd161fe78f24d865c4490fa42f29 | |
parent | 6045fe2ee565f3eac4cccea8f5ff5aeff6e6f156 (diff) | |
download | mongo-f4b9d3b6d94765c1bd404ef42cf8fc130b7d9e31.tar.gz |
SERVER-68860 integrate TransactionOperations into TransactionParticipant
-rw-r--r-- | src/mongo/db/transaction/transaction_participant.cpp | 69 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_participant.h | 18 |
2 files changed, 23 insertions, 64 deletions
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index 1a811e6f68b..73dd77aac99 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -903,7 +903,7 @@ void TransactionParticipant::Participant::_beginMultiDocumentTransaction( tickSource, now, *o().transactionExpireDate); - invariant(p().transactionOperations.empty()); + invariant(p().transactionOperations.isEmpty()); } } @@ -1645,7 +1645,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( // Even if the prepared transaction contained no statements, we always reserve at least // 1 oplog slot for the prepare oplog entry. auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx)->size(); - numSlotsToReserve += p().numberOfPrePostImagesToWrite; + numSlotsToReserve += p().transactionOperations.getNumberOfPrePostImagesToWrite(); oplogSlotReserver.emplace(opCtx, std::max(1, static_cast<int>(numSlotsToReserve))); invariant(oplogSlotReserver->getSlots().size() >= 1); prepareOplogSlot = oplogSlotReserver->getLastSlot(); @@ -1671,12 +1671,12 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( } auto opObserver = opCtx->getServiceContext()->getOpObserver(); const auto wallClockTime = opCtx->getServiceContext()->getFastClockSource()->now(); - auto applyOpsOplogSlotAndOperationAssignment = - opObserver->preTransactionPrepare(opCtx, - reservedSlots, - p().numberOfPrePostImagesToWrite, - wallClockTime, - completedTransactionOperations); + auto applyOpsOplogSlotAndOperationAssignment = opObserver->preTransactionPrepare( + opCtx, + reservedSlots, + p().transactionOperations.getNumberOfPrePostImagesToWrite(), + wallClockTime, + completedTransactionOperations); opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.getTimestamp()); opCtx->getWriteUnitOfWork()->prepare(); @@ -1685,7 +1685,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( reservedSlots, completedTransactionOperations, applyOpsOplogSlotAndOperationAssignment.get(), - p().numberOfPrePostImagesToWrite, + p().transactionOperations.getNumberOfPrePostImagesToWrite(), wallClockTime); abortGuard.dismiss(); @@ -1746,34 +1746,8 @@ void TransactionParticipant::Participant::addTransactionOperation( o().activeTxnNumberAndRetryCounter.getTxnNumber() != kUninitializedTxnNumber); invariant(opCtx->lockState()->inAWriteUnitOfWork()); - const auto stmtIds = operation.getStatementIds(); - for (auto stmtId : stmtIds) { - auto [_, inserted] = p().transactionStmtIds.insert(stmtId); - uassert(5875600, - str::stream() << "Found two operations using the same stmtId of " << stmtId, - inserted); - } - p().transactionOperations.push_back(operation); - - p().transactionOperationBytes += - repl::DurableOplogEntry::getDurableReplOperationSize(operation); - if (!operation.getPreImage().isEmpty()) { - p().transactionOperationBytes += operation.getPreImage().objsize(); - if (operation.isPreImageRecordedForRetryableInternalTransaction()) { - ++p().numberOfPrePostImagesToWrite; - } - } - if (!operation.getPostImage().isEmpty()) { - p().transactionOperationBytes += operation.getPostImage().objsize(); - ++p().numberOfPrePostImagesToWrite; - } - - auto transactionSizeLimitBytes = gTransactionSizeLimitBytes.load(); - uassert(ErrorCodes::TransactionTooLarge, - str::stream() << "Total size of all transaction operations must be less than " - << "server parameter 'transactionSizeLimitBytes' = " - << transactionSizeLimitBytes, - p().transactionOperationBytes <= static_cast<size_t>(transactionSizeLimitBytes)); + auto transactionSizeLimitBytes = static_cast<std::size_t>(gTransactionSizeLimitBytes.load()); + uassertStatusOK(p().transactionOperations.addOperation(operation, transactionSizeLimitBytes)); } std::vector<repl::ReplOperation>* @@ -1785,14 +1759,14 @@ TransactionParticipant::Participant::retrieveCompletedTransactionOperations( invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared), str::stream() << "Current state: " << o().txnState); - return &(p().transactionOperations); + return p().transactionOperations.getMutableOperationsForTransactionParticipant(); } TxnResponseMetadata TransactionParticipant::Participant::getResponseMetadata() { // Currently the response metadata only contains a single field, which is whether or not the // transaction is read-only so far. return {o().txnState.isInSet(TransactionState::kInProgress) && - p().transactionOperations.empty()}; + p().transactionOperations.isEmpty()}; } void TransactionParticipant::Participant::clearOperationsInMemory(OperationContext* opCtx) { @@ -1800,10 +1774,7 @@ void TransactionParticipant::Participant::clearOperationsInMemory(OperationConte invariant(o().txnState.isInSet(TransactionState::kPrepared | TransactionState::kInProgress), str::stream() << "Current state: " << o().txnState); invariant(p().autoCommit); - p().transactionOperationBytes = 0; p().transactionOperations.clear(); - p().transactionStmtIds.clear(); - p().numberOfPrePostImagesToWrite = 0; } void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationContext* opCtx) { @@ -1815,7 +1786,8 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); - opObserver->onUnpreparedTransactionCommit(opCtx, txnOps, p().numberOfPrePostImagesToWrite); + opObserver->onUnpreparedTransactionCommit( + opCtx, txnOps, p().transactionOperations.getNumberOfPrePostImagesToWrite()); // Read-only transactions with all read concerns must wait for any data they read to be majority // committed. For local read concern this is to match majority read concern. For both local and @@ -1827,8 +1799,8 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC auto wc = opCtx->getWriteConcern(); auto needsNoopWrite = txnOps->empty() && !opCtx->getWriteConcern().usedDefaultConstructedWC; - const size_t operationCount = p().transactionOperations.size(); - const size_t oplogOperationBytes = p().transactionOperationBytes; + auto operationCount = p().transactionOperations.numOperations(); + auto oplogOperationBytes = p().transactionOperations.getTotalOperationBytes(); clearOperationsInMemory(opCtx); // _commitStorageTransaction can throw, but it is safe for the exception to be bubbled up to @@ -1956,8 +1928,8 @@ void TransactionParticipant::Participant::commitPreparedTransaction( commitTimestamp, *retrieveCompletedTransactionOperations(opCtx)); - const size_t operationCount = p().transactionOperations.size(); - const size_t oplogOperationBytes = p().transactionOperationBytes; + auto operationCount = p().transactionOperations.numOperations(); + auto oplogOperationBytes = p().transactionOperations.getTotalOperationBytes(); clearOperationsInMemory(opCtx); _finishCommitTransaction(opCtx, operationCount, oplogOperationBytes); @@ -3127,10 +3099,7 @@ void TransactionParticipant::Participant::_resetTransactionStateAndUnlock( o(*lk).txnState.transitionTo(state); } - p().transactionOperationBytes = 0; p().transactionOperations.clear(); - p().transactionStmtIds.clear(); - p().numberOfPrePostImagesToWrite = 0; o(*lk).prepareOpTime = repl::OpTime(); o(*lk).recoveryPrepareOpTime = repl::OpTime(); p().autoCommit = boost::none; diff --git a/src/mongo/db/transaction/transaction_participant.h b/src/mongo/db/transaction/transaction_participant.h index ad7d1cb14ff..6b500f5dfcf 100644 --- a/src/mongo/db/transaction/transaction_participant.h +++ b/src/mongo/db/transaction/transaction_participant.h @@ -51,6 +51,7 @@ #include "mongo/db/storage/recovery_unit.h" #include "mongo/db/storage/storage_engine.h" #include "mongo/db/transaction/transaction_metrics_observer.h" +#include "mongo/db/transaction/transaction_operations.h" #include "mongo/idl/mutable_observer_registry.h" #include "mongo/logv2/attribute_storage.h" #include "mongo/stdx/unordered_map.h" @@ -754,11 +755,11 @@ public: } std::vector<repl::ReplOperation> getTransactionOperationsForTest() const { - return p().transactionOperations; + return p().transactionOperations.getOperationsForTest(); } size_t getNumberOfPrePostImagesToWriteForTest() const { - return p().numberOfPrePostImagesToWrite; + return p().transactionOperations.getNumberOfPrePostImagesToWrite(); } const Locker* getTxnResourceStashLockerForTest() const { @@ -1153,18 +1154,7 @@ private: // Holds oplog data for operations which have been applied in the current multi-document // transaction. - std::vector<repl::ReplOperation> transactionOperations; - - // Holds stmtIds for operations which have been applied in the current multi-document - // transaction. - stdx::unordered_set<StmtId> transactionStmtIds; - - // Total size in bytes of all operations within the _transactionOperations vector. - size_t transactionOperationBytes{0}; - - // Number of operations that have pre-images or post-images to be written to noop oplog - // entries or the image collection. - size_t numberOfPrePostImagesToWrite{0}; + TransactionOperations transactionOperations; // The autocommit setting of this transaction. Should always be false for multi-statement // transaction. Currently only needed for diagnostics reporting. |