diff options
author | Benety Goh <benety@mongodb.com> | 2022-10-12 09:11:15 -0400 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-12 14:11:50 +0000 |
commit | d6f85073849817aa6224f5c7f0c48f36a3c24ff3 (patch) | |
tree | 410de4a2b9639a6eeae42ce909455b66c2cbdf6d /src/mongo | |
parent | ea85b99ada1fe21761f1e4f442f53509fda09d3a (diff) | |
download | mongo-d6f85073849817aa6224f5c7f0c48f36a3c24ff3.tar.gz |
SERVER-69749 TransactionParticipant::retrieveCompletedTransactionOperations() returns TransactionOperations
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/op_observer/op_observer_impl_test.cpp | 67 | ||||
-rw-r--r-- | src/mongo/db/s/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/s/resharding/resharding_oplog_application.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_participant.cpp | 51 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_participant.h | 3 |
5 files changed, 71 insertions, 53 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp index c63dbbff150..f08f1a9592f 100644 --- a/src/mongo/db/op_observer/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp @@ -135,7 +135,9 @@ void commitUnpreparedTransaction(OperationContext* opCtx, OpObserverType& opObse auto txnParticipant = TransactionParticipant::get(opCtx); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx); opObserver.onUnpreparedTransactionCommit( - opCtx, txnOps, txnParticipant.getNumberOfPrePostImagesToWriteForTest()); + opCtx, + txnOps->getMutableOperationsForOpObserver(), + txnParticipant.getNumberOfPrePostImagesToWriteForTest()); } std::vector<repl::OpTime> reserveOpTimesInSideTransaction(OperationContext* opCtx, size_t count) { @@ -1146,12 +1148,12 @@ protected: size_t numberOfPrePostImagesToWrite = 0) { auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx()); auto currentTime = Date_t::now(); - auto applyOpsAssignment = - opObserver().preTransactionPrepare(opCtx(), reservedSlots, currentTime, txnOps); + auto applyOpsAssignment = opObserver().preTransactionPrepare( + opCtx(), reservedSlots, currentTime, txnOps->getMutableOperationsForOpObserver()); opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp()); opObserver().onTransactionPrepare(opCtx(), reservedSlots, - txnOps, + txnOps->getMutableOperationsForOpObserver(), applyOpsAssignment.get(), numberOfPrePostImagesToWrite, currentTime); @@ -1366,7 +1368,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { opCtx(), commitSlot, prepareTimestamp, - *txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + *(txnParticipant.retrieveCompletedTransactionOperations(opCtx()) + ->getMutableOperationsForOpObserver())); } repl::OplogInterfaceLocal oplogInterface(opCtx()); auto oplogIter = oplogInterface.makeIterator(); @@ -1592,7 +1595,8 @@ TEST_F(OpObserverTransactionTest, CommittingUnpreparedNonEmptyTransactionWritesT } auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); opCtx()->getWriteUnitOfWork()->commit(); assertTxnRecord(txnNum(), {}, DurableTxnStateEnum::kCommitted); @@ -1604,7 +1608,8 @@ TEST_F(OpObserverTransactionTest, txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction"); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); txnParticipant.stashTransactionResources(opCtx()); @@ -1644,7 +1649,8 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti opCtx(), commitSlot, prepareOpTime.getTimestamp(), - *txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + *(txnParticipant.retrieveCompletedTransactionOperations(opCtx()) + ->getMutableOperationsForOpObserver())); } assertTxnRecord(txnNum(), commitOpTime, DurableTxnStateEnum::kCommitted); } @@ -1673,7 +1679,8 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) { opObserver().onInserts(opCtx(), *autoColl1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), *autoColl2, inserts2.begin(), inserts2.end(), false); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); @@ -1737,7 +1744,8 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTestIncludesTenantId) { opObserver().onInserts(opCtx(), *autoColl2, inserts2.begin(), inserts2.end(), false); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); @@ -1810,7 +1818,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) { opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); @@ -1864,7 +1873,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTestIncludesTenantId) { opObserver().onUpdate(opCtx(), update2); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); @@ -1915,7 +1925,8 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) { << "y")); opObserver().onDelete(opCtx(), nss2, uuid2, 0, {}); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntry = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntry); auto o = oplogEntry.getObjectField("o"); @@ -1956,7 +1967,8 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTestIncludesTenantId) { opObserver().onDelete(opCtx(), nss2, uuid2, 0, {}); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObj = getSingleOplogEntry(opCtx()); checkCommonFields(oplogEntryObj); @@ -2003,7 +2015,8 @@ TEST_F(OpObserverTransactionTest, donorMtab->startBlockingWrites(); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - ASSERT_THROWS_CODE(opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0), + ASSERT_THROWS_CODE(opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0), DBException, ErrorCodes::TenantMigrationConflict); @@ -3427,7 +3440,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) { WriteUnitOfWork wuow(opCtx()); opObserver().onInserts(opCtx(), *autoColl, inserts.begin(), inserts.end(), false); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObj = getNOplogEntries(opCtx(), 1)[0]; checkSessionAndTransactionFields(oplogEntryObj); auto oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj)); @@ -3460,7 +3474,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) { opObserver().onInserts(opCtx(), *autoColl1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), *autoColl2, inserts2.begin(), inserts2.end(), false); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 4); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -3537,7 +3552,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) { opObserver().onUpdate(opCtx(), update1); opObserver().onUpdate(opCtx(), update2); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -3595,7 +3611,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) { << "y")); opObserver().onDelete(opCtx(), nss2, uuid2, 0, {}); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -3896,7 +3913,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) { opCtx(), commitSlot, commitTimestamp, - *txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + *(txnParticipant.retrieveCompletedTransactionOperations(opCtx()) + ->getMutableOperationsForOpObserver())); } oplogEntryObjs = getNOplogEntries(opCtx(), 3); const auto commitOplogObj = oplogEntryObjs.back(); @@ -3995,7 +4013,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) { opObserver().onInserts(opCtx(), *autoColl1, inserts1.begin(), inserts1.end(), false); opObserver().onInserts(opCtx(), *autoColl2, inserts2.begin(), inserts2.end(), false); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 1); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; @@ -4144,7 +4163,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedPackingTest) { opCtx(), commitSlot, commitTimestamp, - *txnParticipant.retrieveCompletedTransactionOperations(opCtx())); + *(txnParticipant.retrieveCompletedTransactionOperations(opCtx()) + ->getMutableOperationsForOpObserver())); oplogEntryObjs = getNOplogEntries(opCtx(), 2); const auto commitOplogObj = oplogEntryObjs.back(); @@ -4203,7 +4223,8 @@ TEST_F(OpObserverLargeTransactionTest, LargeTransactionCreatesMultipleOplogEntri txnParticipant.addTransactionOperation(opCtx(), operation1); txnParticipant.addTransactionOperation(opCtx(), operation2); auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx()); - opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0); + opObserver().onUnpreparedTransactionCommit( + opCtx(), txnOps->getMutableOperationsForOpObserver(), 0); auto oplogEntryObjs = getNOplogEntries(opCtx(), 2); std::vector<OplogEntry> oplogEntries; mongo::repl::OpTime expectedPrevWriteOpTime; diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 485bc015547..789a7fcec6a 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -225,6 +225,7 @@ env.Library( '$BUILD_DIR/mongo/db/session/session_catalog_mongod', '$BUILD_DIR/mongo/db/storage/remove_saver', '$BUILD_DIR/mongo/db/timeseries/bucket_catalog', + '$BUILD_DIR/mongo/db/transaction/transaction_operations', '$BUILD_DIR/mongo/s/common_s', '$BUILD_DIR/mongo/util/future_util', 'sharding_catalog', diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp index f7d4dcb468f..cbdc9319c59 100644 --- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp +++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp @@ -105,7 +105,7 @@ void runWithTransaction(OperationContext* opCtx, func(asr.opCtx()); - if (txnParticipant.retrieveCompletedTransactionOperations(asr.opCtx())->size() > 0) { + if (!txnParticipant.retrieveCompletedTransactionOperations(asr.opCtx())->isEmpty()) { // Similar to the `isTimestamped` check in `applyOperation`, we only want to commit the // transaction if we're doing replicated writes. txnParticipant.commitUnpreparedTransaction(asr.opCtx()); diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index 67b1d38f664..ef9ea4bd469 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -1614,16 +1614,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( // Create a set of collection UUIDs through which to iterate, so that we do not recheck the same // collection multiple times: it is a costly check. - stdx::unordered_set<UUID, UUID::Hash> transactionOperationUuids; - for (const auto& transactionOp : *completedTransactionOperations) { - if (transactionOp.getOpType() == repl::OpTypeEnum::kNoop) { - // No-ops can't modify data, so there's no need to check if they involved a temporary - // collection. - continue; - } - - transactionOperationUuids.insert(transactionOp.getUuid().value()); - } + auto transactionOperationUuids = completedTransactionOperations->getCollectionUUIDs(); auto catalog = CollectionCatalog::get(opCtx); for (const auto& uuid : transactionOperationUuids) { auto collection = catalog->lookupCollectionByUUID(opCtx, uuid); @@ -1653,7 +1644,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( } else { // Even if the prepared transaction contained no statements, we always reserve at least // 1 oplog slot for the prepare oplog entry. - auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx)->size(); + auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx)->numOperations(); numSlotsToReserve += p().transactionOperations.getNumberOfPrePostImagesToWrite(); oplogSlotReserver.emplace(opCtx, std::max(1, static_cast<int>(numSlotsToReserve))); invariant(oplogSlotReserver->getSlots().size() >= 1); @@ -1681,17 +1672,21 @@ Timestamp TransactionParticipant::Participant::prepareTransaction( auto opObserver = opCtx->getServiceContext()->getOpObserver(); const auto wallClockTime = opCtx->getServiceContext()->getFastClockSource()->now(); auto applyOpsOplogSlotAndOperationAssignment = opObserver->preTransactionPrepare( - opCtx, reservedSlots, wallClockTime, completedTransactionOperations); + opCtx, + reservedSlots, + wallClockTime, + completedTransactionOperations->getMutableOperationsForOpObserver()); opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.getTimestamp()); opCtx->getWriteUnitOfWork()->prepare(); p().needToWriteAbortEntry = true; - opObserver->onTransactionPrepare(opCtx, - reservedSlots, - completedTransactionOperations, - applyOpsOplogSlotAndOperationAssignment.get(), - p().transactionOperations.getNumberOfPrePostImagesToWrite(), - wallClockTime); + opObserver->onTransactionPrepare( + opCtx, + reservedSlots, + completedTransactionOperations->getMutableOperationsForOpObserver(), + applyOpsOplogSlotAndOperationAssignment.get(), + p().transactionOperations.getNumberOfPrePostImagesToWrite(), + wallClockTime); abortGuard.dismiss(); @@ -1755,8 +1750,7 @@ void TransactionParticipant::Participant::addTransactionOperation( uassertStatusOK(p().transactionOperations.addOperation(operation, transactionSizeLimitBytes)); } -std::vector<repl::ReplOperation>* -TransactionParticipant::Participant::retrieveCompletedTransactionOperations( +TransactionOperations* TransactionParticipant::Participant::retrieveCompletedTransactionOperations( OperationContext* opCtx) { // Ensure that we only ever retrieve a transaction's completed operations when in progress @@ -1764,7 +1758,7 @@ TransactionParticipant::Participant::retrieveCompletedTransactionOperations( invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared), str::stream() << "Current state: " << o().txnState); - return p().transactionOperations.getMutableOperationsForOpObserver(); + return &(p().transactionOperations); } TxnResponseMetadata TransactionParticipant::Participant::getResponseMetadata() { @@ -1792,7 +1786,9 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC invariant(opObserver); opObserver->onUnpreparedTransactionCommit( - opCtx, txnOps, p().transactionOperations.getNumberOfPrePostImagesToWrite()); + opCtx, + txnOps->getMutableOperationsForOpObserver(), + p().transactionOperations.getNumberOfPrePostImagesToWrite()); // Read-only transactions with all read concerns must wait for any data they read to be majority // committed. For local read concern this is to match majority read concern. For both local and @@ -1802,7 +1798,7 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC // // TODO (SERVER-41165): Snapshot read concern should wait on the read timestamp instead. auto wc = opCtx->getWriteConcern(); - auto needsNoopWrite = txnOps->empty() && !opCtx->getWriteConcern().usedDefaultConstructedWC; + auto needsNoopWrite = txnOps->isEmpty() && !opCtx->getWriteConcern().usedDefaultConstructedWC; auto operationCount = p().transactionOperations.numOperations(); auto oplogOperationBytes = p().transactionOperations.getTotalOperationBytes(); @@ -1928,10 +1924,11 @@ void TransactionParticipant::Participant::commitPreparedTransaction( invariant(opObserver); // Once the transaction is committed, the oplog entry must be written. - opObserver->onPreparedTransactionCommit(opCtx, - commitOplogSlot, - commitTimestamp, - *retrieveCompletedTransactionOperations(opCtx)); + opObserver->onPreparedTransactionCommit( + opCtx, + commitOplogSlot, + commitTimestamp, + *retrieveCompletedTransactionOperations(opCtx)->getMutableOperationsForOpObserver()); auto operationCount = p().transactionOperations.numOperations(); auto oplogOperationBytes = p().transactionOperations.getTotalOperationBytes(); diff --git a/src/mongo/db/transaction/transaction_participant.h b/src/mongo/db/transaction/transaction_participant.h index 6b500f5dfcf..ac32f9fb32d 100644 --- a/src/mongo/db/transaction/transaction_participant.h +++ b/src/mongo/db/transaction/transaction_participant.h @@ -618,8 +618,7 @@ public: * to the transaction. It is legal to call this method only when the transaction state is * in progress or committed. */ - std::vector<repl::ReplOperation>* retrieveCompletedTransactionOperations( - OperationContext* opCtx); + TransactionOperations* retrieveCompletedTransactionOperations(OperationContext* opCtx); /** * Returns an object containing transaction-related metadata to append on responses. |