summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBenety Goh <benety@mongodb.com>2022-10-12 09:11:15 -0400
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-10-12 14:11:50 +0000
commitd6f85073849817aa6224f5c7f0c48f36a3c24ff3 (patch)
tree410de4a2b9639a6eeae42ce909455b66c2cbdf6d /src/mongo
parentea85b99ada1fe21761f1e4f442f53509fda09d3a (diff)
downloadmongo-d6f85073849817aa6224f5c7f0c48f36a3c24ff3.tar.gz
SERVER-69749 TransactionParticipant::retrieveCompletedTransactionOperations() returns TransactionOperations
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/op_observer/op_observer_impl_test.cpp67
-rw-r--r--src/mongo/db/s/SConscript1
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_application.cpp2
-rw-r--r--src/mongo/db/transaction/transaction_participant.cpp51
-rw-r--r--src/mongo/db/transaction/transaction_participant.h3
5 files changed, 71 insertions, 53 deletions
diff --git a/src/mongo/db/op_observer/op_observer_impl_test.cpp b/src/mongo/db/op_observer/op_observer_impl_test.cpp
index c63dbbff150..f08f1a9592f 100644
--- a/src/mongo/db/op_observer/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer/op_observer_impl_test.cpp
@@ -135,7 +135,9 @@ void commitUnpreparedTransaction(OperationContext* opCtx, OpObserverType& opObse
auto txnParticipant = TransactionParticipant::get(opCtx);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx);
opObserver.onUnpreparedTransactionCommit(
- opCtx, txnOps, txnParticipant.getNumberOfPrePostImagesToWriteForTest());
+ opCtx,
+ txnOps->getMutableOperationsForOpObserver(),
+ txnParticipant.getNumberOfPrePostImagesToWriteForTest());
}
std::vector<repl::OpTime> reserveOpTimesInSideTransaction(OperationContext* opCtx, size_t count) {
@@ -1146,12 +1148,12 @@ protected:
size_t numberOfPrePostImagesToWrite = 0) {
auto txnOps = txnParticipant().retrieveCompletedTransactionOperations(opCtx());
auto currentTime = Date_t::now();
- auto applyOpsAssignment =
- opObserver().preTransactionPrepare(opCtx(), reservedSlots, currentTime, txnOps);
+ auto applyOpsAssignment = opObserver().preTransactionPrepare(
+ opCtx(), reservedSlots, currentTime, txnOps->getMutableOperationsForOpObserver());
opCtx()->recoveryUnit()->setPrepareTimestamp(prepareOpTime.getTimestamp());
opObserver().onTransactionPrepare(opCtx(),
reservedSlots,
- txnOps,
+ txnOps->getMutableOperationsForOpObserver(),
applyOpsAssignment.get(),
numberOfPrePostImagesToWrite,
currentTime);
@@ -1366,7 +1368,8 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
opCtx(),
commitSlot,
prepareTimestamp,
- *txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ *(txnParticipant.retrieveCompletedTransactionOperations(opCtx())
+ ->getMutableOperationsForOpObserver()));
}
repl::OplogInterfaceLocal oplogInterface(opCtx());
auto oplogIter = oplogInterface.makeIterator();
@@ -1592,7 +1595,8 @@ TEST_F(OpObserverTransactionTest, CommittingUnpreparedNonEmptyTransactionWritesT
}
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
opCtx()->getWriteUnitOfWork()->commit();
assertTxnRecord(txnNum(), {}, DurableTxnStateEnum::kCommitted);
@@ -1604,7 +1608,8 @@ TEST_F(OpObserverTransactionTest,
txnParticipant.unstashTransactionResources(opCtx(), "prepareTransaction");
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
txnParticipant.stashTransactionResources(opCtx());
@@ -1644,7 +1649,8 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
opCtx(),
commitSlot,
prepareOpTime.getTimestamp(),
- *txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ *(txnParticipant.retrieveCompletedTransactionOperations(opCtx())
+ ->getMutableOperationsForOpObserver()));
}
assertTxnRecord(txnNum(), commitOpTime, DurableTxnStateEnum::kCommitted);
}
@@ -1673,7 +1679,8 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTest) {
opObserver().onInserts(opCtx(), *autoColl1, inserts1.begin(), inserts1.end(), false);
opObserver().onInserts(opCtx(), *autoColl2, inserts2.begin(), inserts2.end(), false);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObj = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntryObj);
OplogEntry oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
@@ -1737,7 +1744,8 @@ TEST_F(OpObserverTransactionTest, TransactionalInsertTestIncludesTenantId) {
opObserver().onInserts(opCtx(), *autoColl2, inserts2.begin(), inserts2.end(), false);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObj = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntryObj);
@@ -1810,7 +1818,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTest) {
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntry = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntry);
auto o = oplogEntry.getObjectField("o");
@@ -1864,7 +1873,8 @@ TEST_F(OpObserverTransactionTest, TransactionalUpdateTestIncludesTenantId) {
opObserver().onUpdate(opCtx(), update2);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObj = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntryObj);
@@ -1915,7 +1925,8 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTest) {
<< "y"));
opObserver().onDelete(opCtx(), nss2, uuid2, 0, {});
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntry = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntry);
auto o = oplogEntry.getObjectField("o");
@@ -1956,7 +1967,8 @@ TEST_F(OpObserverTransactionTest, TransactionalDeleteTestIncludesTenantId) {
opObserver().onDelete(opCtx(), nss2, uuid2, 0, {});
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObj = getSingleOplogEntry(opCtx());
checkCommonFields(oplogEntryObj);
@@ -2003,7 +2015,8 @@ TEST_F(OpObserverTransactionTest,
donorMtab->startBlockingWrites();
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- ASSERT_THROWS_CODE(opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0),
+ ASSERT_THROWS_CODE(opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0),
DBException,
ErrorCodes::TenantMigrationConflict);
@@ -3427,7 +3440,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionSingleStatementTest) {
WriteUnitOfWork wuow(opCtx());
opObserver().onInserts(opCtx(), *autoColl, inserts.begin(), inserts.end(), false);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObj = getNOplogEntries(opCtx(), 1)[0];
checkSessionAndTransactionFields(oplogEntryObj);
auto oplogEntry = assertGet(OplogEntry::parse(oplogEntryObj));
@@ -3460,7 +3474,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalInsertTest) {
opObserver().onInserts(opCtx(), *autoColl1, inserts1.begin(), inserts1.end(), false);
opObserver().onInserts(opCtx(), *autoColl2, inserts2.begin(), inserts2.end(), false);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObjs = getNOplogEntries(opCtx(), 4);
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -3537,7 +3552,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalUpdateTest) {
opObserver().onUpdate(opCtx(), update1);
opObserver().onUpdate(opCtx(), update2);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -3595,7 +3611,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, TransactionalDeleteTest) {
<< "y"));
opObserver().onDelete(opCtx(), nss2, uuid2, 0, {});
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -3896,7 +3913,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedTest) {
opCtx(),
commitSlot,
commitTimestamp,
- *txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ *(txnParticipant.retrieveCompletedTransactionOperations(opCtx())
+ ->getMutableOperationsForOpObserver()));
}
oplogEntryObjs = getNOplogEntries(opCtx(), 3);
const auto commitOplogObj = oplogEntryObjs.back();
@@ -3995,7 +4013,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, UnpreparedTransactionPackingTest) {
opObserver().onInserts(opCtx(), *autoColl1, inserts1.begin(), inserts1.end(), false);
opObserver().onInserts(opCtx(), *autoColl2, inserts2.begin(), inserts2.end(), false);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObjs = getNOplogEntries(opCtx(), 1);
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
@@ -4144,7 +4163,8 @@ TEST_F(OpObserverMultiEntryTransactionTest, CommitPreparedPackingTest) {
opCtx(),
commitSlot,
commitTimestamp,
- *txnParticipant.retrieveCompletedTransactionOperations(opCtx()));
+ *(txnParticipant.retrieveCompletedTransactionOperations(opCtx())
+ ->getMutableOperationsForOpObserver()));
oplogEntryObjs = getNOplogEntries(opCtx(), 2);
const auto commitOplogObj = oplogEntryObjs.back();
@@ -4203,7 +4223,8 @@ TEST_F(OpObserverLargeTransactionTest, LargeTransactionCreatesMultipleOplogEntri
txnParticipant.addTransactionOperation(opCtx(), operation1);
txnParticipant.addTransactionOperation(opCtx(), operation2);
auto txnOps = txnParticipant.retrieveCompletedTransactionOperations(opCtx());
- opObserver().onUnpreparedTransactionCommit(opCtx(), txnOps, 0);
+ opObserver().onUnpreparedTransactionCommit(
+ opCtx(), txnOps->getMutableOperationsForOpObserver(), 0);
auto oplogEntryObjs = getNOplogEntries(opCtx(), 2);
std::vector<OplogEntry> oplogEntries;
mongo::repl::OpTime expectedPrevWriteOpTime;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 485bc015547..789a7fcec6a 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -225,6 +225,7 @@ env.Library(
'$BUILD_DIR/mongo/db/session/session_catalog_mongod',
'$BUILD_DIR/mongo/db/storage/remove_saver',
'$BUILD_DIR/mongo/db/timeseries/bucket_catalog',
+ '$BUILD_DIR/mongo/db/transaction/transaction_operations',
'$BUILD_DIR/mongo/s/common_s',
'$BUILD_DIR/mongo/util/future_util',
'sharding_catalog',
diff --git a/src/mongo/db/s/resharding/resharding_oplog_application.cpp b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
index f7d4dcb468f..cbdc9319c59 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_application.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_application.cpp
@@ -105,7 +105,7 @@ void runWithTransaction(OperationContext* opCtx,
func(asr.opCtx());
- if (txnParticipant.retrieveCompletedTransactionOperations(asr.opCtx())->size() > 0) {
+ if (!txnParticipant.retrieveCompletedTransactionOperations(asr.opCtx())->isEmpty()) {
// Similar to the `isTimestamped` check in `applyOperation`, we only want to commit the
// transaction if we're doing replicated writes.
txnParticipant.commitUnpreparedTransaction(asr.opCtx());
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp
index 67b1d38f664..ef9ea4bd469 100644
--- a/src/mongo/db/transaction/transaction_participant.cpp
+++ b/src/mongo/db/transaction/transaction_participant.cpp
@@ -1614,16 +1614,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
// Create a set of collection UUIDs through which to iterate, so that we do not recheck the same
// collection multiple times: it is a costly check.
- stdx::unordered_set<UUID, UUID::Hash> transactionOperationUuids;
- for (const auto& transactionOp : *completedTransactionOperations) {
- if (transactionOp.getOpType() == repl::OpTypeEnum::kNoop) {
- // No-ops can't modify data, so there's no need to check if they involved a temporary
- // collection.
- continue;
- }
-
- transactionOperationUuids.insert(transactionOp.getUuid().value());
- }
+ auto transactionOperationUuids = completedTransactionOperations->getCollectionUUIDs();
auto catalog = CollectionCatalog::get(opCtx);
for (const auto& uuid : transactionOperationUuids) {
auto collection = catalog->lookupCollectionByUUID(opCtx, uuid);
@@ -1653,7 +1644,7 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
} else {
// Even if the prepared transaction contained no statements, we always reserve at least
// 1 oplog slot for the prepare oplog entry.
- auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx)->size();
+ auto numSlotsToReserve = retrieveCompletedTransactionOperations(opCtx)->numOperations();
numSlotsToReserve += p().transactionOperations.getNumberOfPrePostImagesToWrite();
oplogSlotReserver.emplace(opCtx, std::max(1, static_cast<int>(numSlotsToReserve)));
invariant(oplogSlotReserver->getSlots().size() >= 1);
@@ -1681,17 +1672,21 @@ Timestamp TransactionParticipant::Participant::prepareTransaction(
auto opObserver = opCtx->getServiceContext()->getOpObserver();
const auto wallClockTime = opCtx->getServiceContext()->getFastClockSource()->now();
auto applyOpsOplogSlotAndOperationAssignment = opObserver->preTransactionPrepare(
- opCtx, reservedSlots, wallClockTime, completedTransactionOperations);
+ opCtx,
+ reservedSlots,
+ wallClockTime,
+ completedTransactionOperations->getMutableOperationsForOpObserver());
opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.getTimestamp());
opCtx->getWriteUnitOfWork()->prepare();
p().needToWriteAbortEntry = true;
- opObserver->onTransactionPrepare(opCtx,
- reservedSlots,
- completedTransactionOperations,
- applyOpsOplogSlotAndOperationAssignment.get(),
- p().transactionOperations.getNumberOfPrePostImagesToWrite(),
- wallClockTime);
+ opObserver->onTransactionPrepare(
+ opCtx,
+ reservedSlots,
+ completedTransactionOperations->getMutableOperationsForOpObserver(),
+ applyOpsOplogSlotAndOperationAssignment.get(),
+ p().transactionOperations.getNumberOfPrePostImagesToWrite(),
+ wallClockTime);
abortGuard.dismiss();
@@ -1755,8 +1750,7 @@ void TransactionParticipant::Participant::addTransactionOperation(
uassertStatusOK(p().transactionOperations.addOperation(operation, transactionSizeLimitBytes));
}
-std::vector<repl::ReplOperation>*
-TransactionParticipant::Participant::retrieveCompletedTransactionOperations(
+TransactionOperations* TransactionParticipant::Participant::retrieveCompletedTransactionOperations(
OperationContext* opCtx) {
// Ensure that we only ever retrieve a transaction's completed operations when in progress
@@ -1764,7 +1758,7 @@ TransactionParticipant::Participant::retrieveCompletedTransactionOperations(
invariant(o().txnState.isInSet(TransactionState::kInProgress | TransactionState::kPrepared),
str::stream() << "Current state: " << o().txnState);
- return p().transactionOperations.getMutableOperationsForOpObserver();
+ return &(p().transactionOperations);
}
TxnResponseMetadata TransactionParticipant::Participant::getResponseMetadata() {
@@ -1792,7 +1786,9 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC
invariant(opObserver);
opObserver->onUnpreparedTransactionCommit(
- opCtx, txnOps, p().transactionOperations.getNumberOfPrePostImagesToWrite());
+ opCtx,
+ txnOps->getMutableOperationsForOpObserver(),
+ p().transactionOperations.getNumberOfPrePostImagesToWrite());
// Read-only transactions with all read concerns must wait for any data they read to be majority
// committed. For local read concern this is to match majority read concern. For both local and
@@ -1802,7 +1798,7 @@ void TransactionParticipant::Participant::commitUnpreparedTransaction(OperationC
//
// TODO (SERVER-41165): Snapshot read concern should wait on the read timestamp instead.
auto wc = opCtx->getWriteConcern();
- auto needsNoopWrite = txnOps->empty() && !opCtx->getWriteConcern().usedDefaultConstructedWC;
+ auto needsNoopWrite = txnOps->isEmpty() && !opCtx->getWriteConcern().usedDefaultConstructedWC;
auto operationCount = p().transactionOperations.numOperations();
auto oplogOperationBytes = p().transactionOperations.getTotalOperationBytes();
@@ -1928,10 +1924,11 @@ void TransactionParticipant::Participant::commitPreparedTransaction(
invariant(opObserver);
// Once the transaction is committed, the oplog entry must be written.
- opObserver->onPreparedTransactionCommit(opCtx,
- commitOplogSlot,
- commitTimestamp,
- *retrieveCompletedTransactionOperations(opCtx));
+ opObserver->onPreparedTransactionCommit(
+ opCtx,
+ commitOplogSlot,
+ commitTimestamp,
+ *retrieveCompletedTransactionOperations(opCtx)->getMutableOperationsForOpObserver());
auto operationCount = p().transactionOperations.numOperations();
auto oplogOperationBytes = p().transactionOperations.getTotalOperationBytes();
diff --git a/src/mongo/db/transaction/transaction_participant.h b/src/mongo/db/transaction/transaction_participant.h
index 6b500f5dfcf..ac32f9fb32d 100644
--- a/src/mongo/db/transaction/transaction_participant.h
+++ b/src/mongo/db/transaction/transaction_participant.h
@@ -618,8 +618,7 @@ public:
* to the transaction. It is legal to call this method only when the transaction state is
* in progress or committed.
*/
- std::vector<repl::ReplOperation>* retrieveCompletedTransactionOperations(
- OperationContext* opCtx);
+ TransactionOperations* retrieveCompletedTransactionOperations(OperationContext* opCtx);
/**
* Returns an object containing transaction-related metadata to append on responses.