summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorWenbin Zhu <wenbin.zhu@mongodb.com>2023-02-09 00:26:29 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-02-09 15:11:30 +0000
commite5ee746f9e459686d172ae99204f484e85652d49 (patch)
treee1b5b682ec18e74fbf1e640e75557d448373fbe3
parent1d79792d701ab899165a7c56108633bbeac3c924 (diff)
downloadmongo-e5ee746f9e459686d172ae99204f484e85652d49.tar.gz
SERVER-72762 Split commit and abort entries during secondary oplog application.
-rw-r--r--src/mongo/db/repl/oplog_applier_impl.cpp28
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp167
-rw-r--r--src/mongo/db/repl/oplog_applier_utils.cpp83
-rw-r--r--src/mongo/db/repl/oplog_applier_utils.h81
-rw-r--r--src/mongo/db/repl/oplog_entry.cpp4
-rw-r--r--src/mongo/db/repl/oplog_entry.h8
-rw-r--r--src/mongo/db/repl/oplog_entry_or_grouped_inserts.h3
-rw-r--r--src/mongo/db/repl/split_prepare_session_manager.cpp28
-rw-r--r--src/mongo/db/repl/split_prepare_session_manager.h39
-rw-r--r--src/mongo/db/repl/split_prepare_session_manager_test.cpp59
-rw-r--r--src/mongo/db/transaction/transaction_participant.cpp3
-rw-r--r--src/mongo/db/transaction/transaction_participant_test.cpp27
12 files changed, 405 insertions, 125 deletions
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp
index 35fa85e2ced..885d9f923cd 100644
--- a/src/mongo/db/repl/oplog_applier_impl.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl.cpp
@@ -116,8 +116,8 @@ void _addOplogChainOpsToWriterVectors(OperationContext* opCtx,
std::vector<OplogEntry*>* partialTxnList,
std::vector<std::vector<OplogEntry>>* derivedOps,
OplogEntry* op,
- CachedCollectionProperties* collPropertiesCache,
- std::vector<std::vector<ApplierOperation>>* writerVectors) {
+ std::vector<std::vector<ApplierOperation>>* writerVectors,
+ CachedCollectionProperties* collPropertiesCache) {
auto [txnOps, shouldSerialize] =
readTransactionOperationsFromOplogChainAndCheckForCommands(opCtx, *op, *partialTxnList);
derivedOps->emplace_back(std::move(txnOps));
@@ -695,9 +695,8 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
std::vector<std::vector<OplogEntry>>* derivedOps,
SessionUpdateTracker* sessionUpdateTracker) noexcept {
- // Caches partial transaction operations. Each map entry
- // contains a cumulative list of operations seen in this batch so
- // far.
+ // Caches partial transaction operations. Each map entry contains a cumulative list
+ // of operations seen in this batch so far.
stdx::unordered_map<OpTime, std::vector<OplogEntry*>, OpTime::Hasher> partialTxnOps;
// Provided to _addOplogChainOpsToWriterVectors() when 'partialTxnOps' does not have any entries
@@ -791,7 +790,7 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
// Flush partialTxnList operations for current transaction.
auto* partialTxnList = getPartialTxnList(op);
_addOplogChainOpsToWriterVectors(
- opCtx, partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors);
+ opCtx, partialTxnList, derivedOps, &op, writerVectors, &collPropertiesCache);
invariant(partialTxnList->empty(), op.toStringForLogging());
} else {
// The applyOps entry was not generated as part of a transaction.
@@ -811,18 +810,21 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
if (repl::feature_flags::gApplyPreparedTxnsInParallel.isEnabled(
serverGlobalParams.featureCompatibility)) {
- // Prepare entries in secondary mode do not come in their own batch, we will extract
- // applyOps operations and fill writers with the extracted operations.
+ // Prepare entries in secondary mode do not come in their own batch, extract applyOps
+ // operations and fill writers with the extracted operations.
if (op.shouldPrepare() && (getOptions().mode == OplogApplication::Mode::kSecondary)) {
auto* partialTxnList = getPartialTxnList(op);
_addOplogChainOpsToWriterVectors(
- opCtx, partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors);
- invariant(partialTxnList->empty(), op.toStringForLogging());
+ opCtx, partialTxnList, derivedOps, &op, writerVectors, &collPropertiesCache);
continue;
}
- if (op.isPreparedCommit() &&
+
+ // Fill the writers with commit or abort operation. Depending on whether the operation
+ // refers to a split prepare, it might also be split into multiple ops.
+ if ((op.isPreparedCommit() || op.isPreparedAbort()) &&
(getOptions().mode == OplogApplication::Mode::kSecondary)) {
- // TODO (SERVER-72762): split commit oplog entries.
+ OplogApplierUtils::addDerivedCommitsOrAborts(
+ opCtx, &op, writerVectors, &collPropertiesCache);
continue;
}
}
@@ -833,7 +835,7 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors(
if (op.isPreparedCommit() && (getOptions().mode == OplogApplication::Mode::kInitialSync)) {
auto* partialTxnList = getPartialTxnList(op);
_addOplogChainOpsToWriterVectors(
- opCtx, partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors);
+ opCtx, partialTxnList, derivedOps, &op, writerVectors, &collPropertiesCache);
invariant(partialTxnList->empty(), op.toStringForLogging());
continue;
}
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index 42e978295a7..ed68d116056 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -4824,18 +4824,39 @@ protected:
_writerPool.get());
}
- auto makePrepareOplogForCrudBatch(const std::vector<BSONObj>& batch,
- const OpTime opTime,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber) {
+ auto makePrepareOplogEntry(const std::vector<BSONObj>& docs,
+ const OpTime opTime,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) {
BSONArrayBuilder arrBuilder;
- arrBuilder.append(batch.begin(), batch.end());
+ arrBuilder.append(docs.begin(), docs.end());
auto command = BSON("applyOps" << arrBuilder.arr() << "prepare" << true);
return makeCommandOplogEntryWithSessionInfoAndStmtIds(
opTime, _cmdNss, command, lsid, TxnNumber(txnNumber), {StmtId(0)}, OpTime());
}
+ auto makeCommitOplogEntry(const OpTime opTime,
+ const Timestamp commitTs,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const OpTime prevOpTime) {
+ auto command = BSON("commitTransaction" << 1 << "commitTimestamp" << commitTs);
+
+ return makeCommandOplogEntryWithSessionInfoAndStmtIds(
+ opTime, _cmdNss, command, lsid, TxnNumber(txnNumber), {StmtId(0)}, prevOpTime);
+ }
+
+ auto makeAbortOplogEntry(const OpTime opTime,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ const OpTime prevOpTime) {
+ auto command = BSON("abortTransaction" << 1);
+
+ return makeCommandOplogEntryWithSessionInfoAndStmtIds(
+ opTime, _cmdNss, command, lsid, TxnNumber(txnNumber), {StmtId(0)}, prevOpTime);
+ }
+
int filterConfigTransactionsEntryFromWriterVectors(WriterVectors& writerVectors) {
int txnTableOps = 0;
for (auto& vector : writerVectors) {
@@ -4870,7 +4891,7 @@ TEST_F(PreparedTxnSplitTest, MultiplePrepareTxnsInSameBatch) {
// changes.
const int kNumEntries = _writerPool->getStats().options.maxThreads * 1000;
- std::vector<OplogEntry> ops;
+ std::vector<OplogEntry> prepareOps;
std::vector<BSONObj> cruds1;
std::vector<BSONObj> cruds2;
cruds1.reserve(kNumEntries);
@@ -4886,28 +4907,29 @@ TEST_F(PreparedTxnSplitTest, MultiplePrepareTxnsInSameBatch) {
<< BSON("_id" << i + kNumEntries)));
}
- ops.push_back(makePrepareOplogForCrudBatch(cruds1, {Timestamp(1, 1), 1}, _lsid1, _txnNum1));
- ops.push_back(makePrepareOplogForCrudBatch(cruds2, {Timestamp(1, 2), 1}, _lsid2, _txnNum2));
+ prepareOps.push_back(makePrepareOplogEntry(cruds1, {Timestamp(1, 1), 1}, _lsid1, _txnNum1));
+ prepareOps.push_back(makePrepareOplogEntry(cruds2, {Timestamp(1, 2), 1}, _lsid2, _txnNum2));
- WriterVectors writerVectors(_writerPool->getStats().options.maxThreads);
- std::vector<std::vector<OplogEntry>> derivedOps;
- _applier->fillWriterVectors_forTest(_opCtx.get(), &ops, &writerVectors, &derivedOps);
+ WriterVectors prepareWriterVectors(_writerPool->getStats().options.maxThreads);
+ std::vector<std::vector<OplogEntry>> derivedPrepareOps;
+ _applier->fillWriterVectors_forTest(
+ _opCtx.get(), &prepareOps, &prepareWriterVectors, &derivedPrepareOps);
// Verify the config.transactions collection got one entry for each prepared transaction.
- int txnTableOps = filterConfigTransactionsEntryFromWriterVectors(writerVectors);
+ int txnTableOps = filterConfigTransactionsEntryFromWriterVectors(prepareWriterVectors);
ASSERT_EQ(2, txnTableOps);
// Verify each writer has been assigned operations for both prepared transactions.
- for (auto& writer : writerVectors) {
+ for (auto& writer : prepareWriterVectors) {
ASSERT_EQ(2, writer.size());
- ASSERT_EQ(ApplicationInstruction::applySplitPrepareOps, writer[0].instruction);
+ ASSERT_EQ(ApplicationInstruction::applySplitPrepareOp, writer[0].instruction);
ASSERT_TRUE(writer[0]->shouldPrepare());
ASSERT_NE(boost::none, writer[0].subSession);
ASSERT_NE(boost::none, writer[0].splitPrepareOps);
ASSERT_FALSE(writer[0].splitPrepareOps->empty());
- ASSERT_EQ(ApplicationInstruction::applySplitPrepareOps, writer[1].instruction);
+ ASSERT_EQ(ApplicationInstruction::applySplitPrepareOp, writer[1].instruction);
ASSERT_TRUE(writer[1]->shouldPrepare());
ASSERT_NE(boost::none, writer[1].subSession);
ASSERT_NE(boost::none, writer[1].splitPrepareOps);
@@ -4915,35 +4937,114 @@ TEST_F(PreparedTxnSplitTest, MultiplePrepareTxnsInSameBatch) {
ASSERT_NE(writer[0].subSession->getSessionId(), writer[1].subSession->getSessionId());
}
+
+ // Test that applying a commitTransaction or abortTransaction entry in the next batch will
+ // correctly split the entry and add them into those writer vectors that previously got
+ // assigned the prepare entry.
+ std::vector<OplogEntry> commitOps;
+ commitOps.push_back(makeCommitOplogEntry(
+ {Timestamp(3, 1), 1}, Timestamp(2, 1), _lsid1, _txnNum1, prepareOps[0].getOpTime()));
+ commitOps.push_back(
+ makeAbortOplogEntry({Timestamp(3, 2), 1}, _lsid2, _txnNum2, prepareOps[1].getOpTime()));
+
+ WriterVectors commitWriterVectors(_writerPool->getStats().options.maxThreads);
+ std::vector<std::vector<OplogEntry>> derivedCommitOps;
+ _applier->fillWriterVectors_forTest(
+ _opCtx.get(), &commitOps, &commitWriterVectors, &derivedCommitOps);
+
+ // Verify the config.transactions collection got one entry for each commit/abort op.
+ txnTableOps = filterConfigTransactionsEntryFromWriterVectors(commitWriterVectors);
+ ASSERT_EQ(2, txnTableOps);
+
+ // Verify each writer has been assigned a split commit and abort op.
+ for (size_t i = 0; i < commitWriterVectors.size(); ++i) {
+ auto& commitWriter = commitWriterVectors[i];
+ auto& prepareWriter = prepareWriterVectors[i];
+
+ ASSERT_EQ(2, commitWriter.size());
+ ASSERT_EQ(2, prepareWriter.size());
+
+ ASSERT_EQ(ApplicationInstruction::applySplitCommitOrAbortOp, commitWriter[0].instruction);
+ ASSERT_TRUE(commitWriter[0]->isPreparedCommit());
+ ASSERT_EQ(prepareWriter[0].subSession->getSessionId(),
+ commitWriter[0].subSession->getSessionId());
+ ASSERT_EQ(boost::none, commitWriter[0].splitPrepareOps);
+
+ ASSERT_EQ(ApplicationInstruction::applySplitCommitOrAbortOp, commitWriter[1].instruction);
+ ASSERT_EQ(prepareWriter[1].subSession->getSessionId(),
+ commitWriter[1].subSession->getSessionId());
+ ASSERT_NE(boost::none, commitWriter[1].subSession);
+ ASSERT_EQ(boost::none, commitWriter[1].splitPrepareOps);
+
+ ASSERT_NE(commitWriter[0].subSession->getSessionId(),
+ commitWriter[1].subSession->getSessionId());
+ }
}
TEST_F(PreparedTxnSplitTest, SingleEmptyPrepareTransaction) {
RAIIServerParameterControllerForTest controller("featureFlagApplyPreparedTxnsInParallel", true);
- std::vector<OplogEntry> ops;
- ops.push_back(makePrepareOplogForCrudBatch({}, {Timestamp(1, 1), 1}, _lsid1, _txnNum1));
+ std::vector<OplogEntry> prepareOps;
+ prepareOps.push_back(makePrepareOplogEntry({}, {Timestamp(1, 1), 1}, _lsid1, _txnNum1));
- WriterVectors writerVectors(_writerPool->getStats().options.maxThreads);
- std::vector<std::vector<OplogEntry>> derivedOps;
- _applier->fillWriterVectors_forTest(_opCtx.get(), &ops, &writerVectors, &derivedOps);
+ WriterVectors prepareWriterVectors(_writerPool->getStats().options.maxThreads);
+ std::vector<std::vector<OplogEntry>> derivedPrepareOps;
+ _applier->fillWriterVectors_forTest(
+ _opCtx.get(), &prepareOps, &prepareWriterVectors, &derivedPrepareOps);
// Verify the config.transactions collection got an entry for the empty prepared transaction.
- int txnTableOps = filterConfigTransactionsEntryFromWriterVectors(writerVectors);
+ int txnTableOps = filterConfigTransactionsEntryFromWriterVectors(prepareWriterVectors);
ASSERT_EQ(1, txnTableOps);
- // Verify exactly writer has been assigned operation for the empty prepared transaction.
- int count = std::count_if(writerVectors.begin(), writerVectors.end(), [](auto& writer) {
- if (writer.size() != 1) {
- return false;
+ // Verify exactly one writer has been assigned operation for the empty prepared transaction.
+ boost::optional<uint32_t> writerId;
+ for (size_t i = 0; i < prepareWriterVectors.size(); ++i) {
+ auto& writer = prepareWriterVectors[i];
+ ASSERT_LTE(writer.size(), 1);
+ if (writer.size() == 1) {
+ ASSERT_EQ(boost::none, writerId);
+ writerId = i;
+ ASSERT_EQ(ApplicationInstruction::applySplitPrepareOp, writer[0].instruction);
+ ASSERT_TRUE(writer[0]->shouldPrepare());
+ ASSERT_NE(boost::none, writer[0].subSession);
+ ASSERT_NE(boost::none, writer[0].splitPrepareOps);
+ ASSERT_TRUE(writer[0].splitPrepareOps->empty());
}
- ASSERT_EQ(ApplicationInstruction::applySplitPrepareOps, writer[0].instruction);
- ASSERT_TRUE(writer[0]->shouldPrepare());
- ASSERT_NE(boost::none, writer[0].subSession);
- ASSERT_NE(boost::none, writer[0].splitPrepareOps);
- ASSERT_TRUE(writer[0].splitPrepareOps->empty());
- return true;
- });
- ASSERT_EQ(1, count);
+ }
+
+ // Test that applying a commitTransaction in the next batch will correctly split the entry
+ // and add it into those writer vectors that previously got assigned the prepare entry.
+ std::vector<OplogEntry> commitOps;
+ commitOps.push_back(makeCommitOplogEntry(
+ {Timestamp(3, 1), 1}, Timestamp(2, 1), _lsid1, _txnNum1, prepareOps[0].getOpTime()));
+
+ WriterVectors commitWriterVectors(_writerPool->getStats().options.maxThreads);
+ std::vector<std::vector<OplogEntry>> derivedCommitOps;
+ _applier->fillWriterVectors_forTest(
+ _opCtx.get(), &commitOps, &commitWriterVectors, &derivedCommitOps);
+
+ // Verify the config.transactions collection got an entry for the commit op.
+ txnTableOps = filterConfigTransactionsEntryFromWriterVectors(commitWriterVectors);
+ ASSERT_EQ(1, txnTableOps);
+
+ // Verify exactly one writer has been assigned a split commit op.
+ for (size_t i = 0; i < commitWriterVectors.size(); ++i) {
+ auto& commitwriter = commitWriterVectors[i];
+ auto& prepareWriter = prepareWriterVectors[i];
+ ASSERT_LTE(commitwriter.size(), 1);
+ ASSERT_LTE(prepareWriter.size(), 1);
+ ASSERT_EQ(prepareWriter.size(), commitwriter.size());
+
+ if (commitwriter.size() == 1) {
+ ASSERT_EQ(writerId.get(), i);
+ ASSERT_EQ(ApplicationInstruction::applySplitCommitOrAbortOp,
+ commitwriter[0].instruction);
+ ASSERT_TRUE(commitwriter[0]->isPreparedCommit());
+ ASSERT_EQ(prepareWriter[0].subSession->getSessionId(),
+ commitwriter[0].subSession->getSessionId());
+ ASSERT_EQ(boost::none, commitwriter[0].splitPrepareOps);
+ }
+ }
}
class GlobalIndexTest : public OplogApplierImplTest {
diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp
index 2f8bed62443..9a7fddb9503 100644
--- a/src/mongo/db/repl/oplog_applier_utils.cpp
+++ b/src/mongo/db/repl/oplog_applier_utils.cpp
@@ -207,47 +207,84 @@ void OplogApplierUtils::addDerivedPrepares(
std::vector<std::vector<ApplierOperation>>* writerVectors,
CachedCollectionProperties* collPropertiesCache) {
- uint32_t bufSplits = 0;
- std::vector<std::vector<const OplogEntry*>> bufWriterVectors(writerVectors->size());
-
- // Add the ops in the prepared transaction to the buffered writer vectors.
- for (auto&& op : *derivedOps) {
- auto writerId = addToWriterVector(opCtx, &op, &bufWriterVectors, collPropertiesCache);
- bufSplits += bufWriterVectors[writerId].size() == 1;
- }
-
- // Create the split sessions and track them with the the session of this prepare entry.
- uint32_t realSplits = std::max<uint32_t>(bufSplits, 1);
+ // Get the SplitPrepareSessionManager to be used to create split sessions.
auto splitSessManager = ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager();
- const auto& splitSessions = splitSessManager->splitSession(
- *prepareOp->getSessionId(), *prepareOp->getTxnNumber(), realSplits);
- invariant(splitSessions.size() == realSplits);
+ auto splitSessFunc = [=](const std::vector<uint32_t>& writerIds) -> const auto& {
+ const auto& sessions = splitSessManager->splitSession(
+ *prepareOp->getSessionId(), *prepareOp->getTxnNumber(), writerIds);
+ invariant(sessions.size() == writerIds.size());
+ return sessions;
+ };
- // For empty (read-only) prepares, the namespace of the prepare oplog entry (admin.$cmd)
- // will be used to decide which writer vector to add to.
- if (!bufSplits) {
+ // For empty (read-only) prepares, we use the namespace of the original prepare oplog entry
+ // (admin.$cmd) to decide which writer thread to apply it, and assigned it a split session.
+ //
+ // The reason that we also split an empty prepare instead of treating it as some standalone
+ // prepare op (as the prepares in initial sync or recovery mode) is so that we can keep a
+ // logical invariant that all prepares in secondary mode are split, and thus we can apply
+ // empty and non-empty prepares in the same way.
+ if (derivedOps->empty()) {
auto writerId = getWriterId(opCtx, prepareOp, collPropertiesCache, writerVectors->size());
+ const auto& sessionInfos = splitSessFunc({writerId});
(*writerVectors)[writerId].emplace_back(prepareOp,
- ApplicationInstruction::applySplitPrepareOps,
- splitSessions[0],
+ ApplicationInstruction::applySplitPrepareOp,
+ sessionInfos[0].session,
std::vector<const OplogEntry*>{});
return;
}
- // For each writer thread that has been assigned ops for this transaction, acquire a
- // split session and transfer the ops to the real writer vector.
+ // For non-empty prepares, the namespace of each derived op in the transaction is used to
+ // decide which writer thread to apply it. We first add all the derived ops to a buffer
+ // writer vector in order to get all the writer threads needed to apply this transaction.
+ // We then acquire that number of split sessions and assign each writer thread a unique
+ // split session when moving the ops to the real writer vector.
+ std::set<uint32_t> writerIds;
+ std::vector<std::vector<const OplogEntry*>> bufWriterVectors(writerVectors->size());
+ for (auto&& op : *derivedOps) {
+ auto writerId = addToWriterVector(opCtx, &op, &bufWriterVectors, collPropertiesCache);
+ writerIds.emplace(writerId);
+ }
+
+ const auto& sessionInfos = splitSessFunc({writerIds.begin(), writerIds.end()});
for (size_t i = 0, j = 0; i < bufWriterVectors.size(); ++i) {
auto& bufWriter = bufWriterVectors[i];
auto& realWriter = (*writerVectors)[i];
if (!bufWriter.empty()) {
realWriter.emplace_back(prepareOp,
- ApplicationInstruction::applySplitPrepareOps,
- splitSessions[j++],
+ ApplicationInstruction::applySplitPrepareOp,
+ sessionInfos[j++].session,
std::move(bufWriter));
}
}
}
+void OplogApplierUtils::addDerivedCommitsOrAborts(
+ OperationContext* opCtx,
+ OplogEntry* commitOrAbortOp,
+ std::vector<std::vector<ApplierOperation>>* writerVectors,
+ CachedCollectionProperties* collPropertiesCache) {
+
+ auto splitSessManager = ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager();
+ const auto& sessionInfos = splitSessManager->getSplitSessions(*commitOrAbortOp->getSessionId(),
+ *commitOrAbortOp->getTxnNumber());
+
+ // When this commit refers to a non-split prepare, it means the transaction was
+ // prepared when the node was primary or during inital sync/recovery. In this
+ // case we do not split the commit and just add it as-is to the writer vector.
+ if (!sessionInfos.has_value()) {
+ addToWriterVector(opCtx, commitOrAbortOp, writerVectors, collPropertiesCache);
+ return;
+ }
+
+ // When this commit refers to a split prepare, we split the commit and add them
+ // to the writers that have been assigned split prepare ops.
+ for (const auto& sessInfo : *sessionInfos) {
+ auto& writer = (*writerVectors)[sessInfo.requesterId];
+ writer.emplace_back(
+ commitOrAbortOp, ApplicationInstruction::applySplitCommitOrAbortOp, sessInfo.session);
+ }
+}
+
NamespaceString OplogApplierUtils::parseUUIDOrNs(OperationContext* opCtx,
const OplogEntry& oplogEntry) {
auto optionalUuid = oplogEntry.getUuid();
diff --git a/src/mongo/db/repl/oplog_applier_utils.h b/src/mongo/db/repl/oplog_applier_utils.h
index 19bf846c45f..b2658ddc2b5 100644
--- a/src/mongo/db/repl/oplog_applier_utils.h
+++ b/src/mongo/db/repl/oplog_applier_utils.h
@@ -115,6 +115,49 @@ public:
/**
* Adds a set of derived prepared transaction operations to writerVectors.
+ *
+ * The prepareOp and derivedOps are inputs that we use to generate ApplierOperation's to be
+ * added to the writerVectors. The derivedOps contains all the CRUD ops inside the applyOps
+ * part of the prepareOp. When this function finishes the writerVectors may look like this:
+ *
+ * ========================== for non-empty prepared transaction ==========================
+ * writer vector 1: [ ]
+ * writer vector 2: [
+ * ApplierOperation{
+ * op: prepareOp,
+ * instruction: applySplitPrepareOp,
+ * subSession: <split_session_id_1>,
+ * splitPrepareOps: [ crud_op_1, crud_op_3 ],
+ * },
+ * ]
+ * writer vector 3: [
+ * // This op should already exist in the writerVector prior to this function call.
+ * ApplierOperation{ op: <config.transaction_update_op>, instruction: applyOplogEntry },
+ * ]
+ * writer vector 4: [
+ * ApplierOperation{
+ * op: prepareOp,
+ * instruction: applySplitPrepareOp,
+ * subSession: <split_session_id_2>,
+ * splitPrepareOps: [ crud_op_2, crud_op_4 ],
+ * },
+ * ]
+ * ============================ for empty prepared transaction ============================
+ * writer vector 1: [ ]
+ * writer vector 2: [
+ * // This op should already exist in the writerVector prior to this function call.
+ * ApplierOperation{ op: <config.transaction_update_op>, instruction: applyOplogEntry },
+ * ]
+ * writer vector 3: [ ]
+ * writer vector 4: [
+ * // The splitPrepareOps list is made empty, which we can still correctly apply.
+ * ApplierOperation{
+ * op: <original_prepare_op>,
+ * instruction: applySplitPrepareOp,
+ * subSession: <split_session_id_1>,
+ * splitPrepareOps: [ ],
+ * },
+ * ]
*/
static void addDerivedPrepares(OperationContext* opCtx,
OplogEntry* prepareOp,
@@ -123,9 +166,43 @@ public:
CachedCollectionProperties* collPropertiesCache);
/**
+ * Adds commit or abort transaction operations to the writerVectors.
+ *
+ * The commitOrAbortOp is the input that we use to generate ApplierOperation's to be added
+ * to those writerVectors that previously got assigned the prepare ops. When this function
+ * finishes the writerVectors may look like this:
+ *
+ * writer vector 1: [ ]
+ * writer vector 2: [
+ * ApplierOperation{
+ * op: commitOrAbortOp,
+ * instruction: applySplitCommitOrAbortOp,
+ * subSession: <split_session_id_1>,
+ * splitPrepareOps: [ ],
+ * },
+ * ]
+ * writer vector 3: [
+ * // This op should already exist in the writerVector prior to this function call.
+ * ApplierOperation{ op: <config.transaction_update_op>, instruction: applyOplogEntry },
+ * ]
+ * writer vector 4: [
+ * ApplierOperation{
+ * op: commitOrAbortOp,
+ * instruction: applySplitCommitOrAbortOp,
+ * subSession: <split_session_id_2>,
+ * splitPrepareOps: [ ],
+ * },
+ * ]
+ */
+ static void addDerivedCommitsOrAborts(OperationContext* opCtx,
+ OplogEntry* commitOrAbortOp,
+ std::vector<std::vector<ApplierOperation>>* writerVectors,
+ CachedCollectionProperties* collPropertiesCache);
+
+ /**
* Returns the namespace string for this oplogEntry; if it has a UUID it looks up the
- * corresponding namespace and returns it, otherwise it returns the oplog entry 'nss'. If there
- * is a UUID and no namespace with that ID is found, throws NamespaceNotFound.
+ * corresponding namespace and returns it, otherwise it returns the oplog entry 'nss'. If
+ * there is a UUID and no namespace with that ID is found, throws NamespaceNotFound.
*/
static NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry);
diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp
index 4d6fae466cb..33dfc7b514c 100644
--- a/src/mongo/db/repl/oplog_entry.cpp
+++ b/src/mongo/db/repl/oplog_entry.cpp
@@ -791,6 +791,10 @@ bool OplogEntry::isPreparedCommit() const {
return _entry.isPreparedCommit();
}
+bool OplogEntry::isPreparedAbort() const {
+ return _entry.isPreparedAbort();
+}
+
bool OplogEntry::isTerminalApplyOps() const {
return _entry.isTerminalApplyOps();
}
diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h
index eede3f30236..becd58a7aac 100644
--- a/src/mongo/db/repl/oplog_entry.h
+++ b/src/mongo/db/repl/oplog_entry.h
@@ -538,6 +538,13 @@ public:
}
/**
+ * Returns if this is a prepared 'abortTransaction' oplog entry.
+ */
+ bool isPreparedAbort() const {
+ return getCommandType() == DurableOplogEntry::CommandType::kAbortTransaction;
+ }
+
+ /**
* Returns whether the oplog entry represents an applyOps which is a self-contained atomic
* operation, or the last applyOps of an unprepared transaction, as opposed to part of a
* prepared transaction or a non-final applyOps in a transaction.
@@ -734,6 +741,7 @@ public:
bool isPartialTransaction() const;
bool isEndOfLargeTransaction() const;
bool isPreparedCommit() const;
+ bool isPreparedAbort() const;
bool isTerminalApplyOps() const;
bool isSingleOplogEntryTransaction() const;
bool isSingleOplogEntryTransactionWithCommand() const;
diff --git a/src/mongo/db/repl/oplog_entry_or_grouped_inserts.h b/src/mongo/db/repl/oplog_entry_or_grouped_inserts.h
index 669e70a3c0c..20f352d6d4b 100644
--- a/src/mongo/db/repl/oplog_entry_or_grouped_inserts.h
+++ b/src/mongo/db/repl/oplog_entry_or_grouped_inserts.h
@@ -38,7 +38,8 @@ namespace repl {
enum class ApplicationInstruction {
applyOplogEntry,
- applySplitPrepareOps,
+ applySplitPrepareOp,
+ applySplitCommitOrAbortOp,
};
struct ApplierOperation {
diff --git a/src/mongo/db/repl/split_prepare_session_manager.cpp b/src/mongo/db/repl/split_prepare_session_manager.cpp
index 7734defb25c..40fa56416f6 100644
--- a/src/mongo/db/repl/split_prepare_session_manager.cpp
+++ b/src/mongo/db/repl/split_prepare_session_manager.cpp
@@ -35,28 +35,32 @@ namespace repl {
SplitPrepareSessionManager::SplitPrepareSessionManager(InternalSessionPool* sessionPool)
: _sessionPool(sessionPool) {}
-const std::vector<PooledSession>& SplitPrepareSessionManager::splitSession(
- const LogicalSessionId& sessionId, TxnNumber txnNumber, uint32_t numSplits) {
+const std::vector<SplitSessionInfo>& SplitPrepareSessionManager::splitSession(
+ const LogicalSessionId& sessionId,
+ TxnNumber txnNumber,
+ const std::vector<uint32_t>& requesterIds) {
+
+ auto numSplits = requesterIds.size();
invariant(numSplits > 0);
stdx::lock_guard<Latch> lk(_mutex);
auto [it, succ] =
- _splitSessionMap.try_emplace(sessionId, txnNumber, std::vector<PooledSession>());
+ _splitSessionMap.try_emplace(sessionId, txnNumber, std::vector<SplitSessionInfo>());
// The session must not be split before.
invariant(succ);
- auto& sessions = it->second.second;
- sessions.reserve(numSplits);
+ auto& sessionInfos = it->second.second;
+ sessionInfos.reserve(numSplits);
- for (uint32_t i = 0; i < numSplits; ++i) {
- sessions.push_back(_sessionPool->acquireSystemSession());
+ for (auto reqId : requesterIds) {
+ sessionInfos.emplace_back(_sessionPool->acquireSystemSession(), reqId);
}
- return sessions;
+ return sessionInfos;
}
-boost::optional<const std::vector<PooledSession>&> SplitPrepareSessionManager::getSplitSessions(
+boost::optional<const std::vector<SplitSessionInfo>&> SplitPrepareSessionManager::getSplitSessions(
const LogicalSessionId& sessionId, TxnNumber txnNumber) const {
stdx::lock_guard<Latch> lk(_mutex);
@@ -98,9 +102,9 @@ void SplitPrepareSessionManager::releaseSplitSessions(const LogicalSessionId& se
// The txnNumber must not change after the session was split.
invariant(txnNumber == it->second.first);
- auto& sessions = it->second.second;
- for (const auto& sess : sessions) {
- _sessionPool->release(sess);
+ auto& sessionInfos = it->second.second;
+ for (const auto& sessInfo : sessionInfos) {
+ _sessionPool->release(sessInfo.session);
}
_splitSessionMap.erase(it);
diff --git a/src/mongo/db/repl/split_prepare_session_manager.h b/src/mongo/db/repl/split_prepare_session_manager.h
index 6a53763e73b..7732ff8853e 100644
--- a/src/mongo/db/repl/split_prepare_session_manager.h
+++ b/src/mongo/db/repl/split_prepare_session_manager.h
@@ -39,6 +39,19 @@ namespace repl {
using PooledSession = InternalSessionPool::Session;
/**
+ * This struct contains information for a split session, including the split
+ * session itself and the id of the requester who wants to acquire the split
+ * session, which is usually a writer thread from the oplog applier.
+ */
+struct SplitSessionInfo {
+ PooledSession session;
+ uint32_t requesterId;
+
+ SplitSessionInfo(PooledSession&& sess, uint32_t reqId)
+ : session(std::move(sess)), requesterId(reqId) {}
+};
+
+/**
* This class manages the sessions for split prepared transactions.
*
* Prepared transactions are split and applied in parallel on secondaries, and this class is
@@ -57,22 +70,35 @@ public:
/**
* Creates split sessions for the given top-level session and track the mapping.
*
+ * The txnNumber is the txnNumber of the top-level session that needs to be split. It
+ * servers as a sanity check: onces a session is split, it can not be split again for
+ * another txnNumber until it releases the existing split sessions.
+ *
+ * The requesterIds is a sorted list of requesters (usually writer threads from the
+ * oplog applier), each of whom wants to acquire a split session.
+ *
* Asserts if the given session is already split.
*/
- const std::vector<PooledSession>& splitSession(const LogicalSessionId& sessionId,
- TxnNumber txnNumber,
- uint32_t numSplits);
+ const std::vector<SplitSessionInfo>& splitSession(const LogicalSessionId& sessionId,
+ TxnNumber txnNumber,
+ const std::vector<uint32_t>& requesterIds);
/**
* Returns a vector of split sessions for the given top-level session, or nothing if
* the given session has not been split.
+ *
+ * The txnNumber servers a sanity check to make sure it is the same as the one being
+ * used for splitSession().
*/
- boost::optional<const std::vector<PooledSession>&> getSplitSessions(
+ boost::optional<const std::vector<SplitSessionInfo>&> getSplitSessions(
const LogicalSessionId& sessionId, TxnNumber txnNumber) const;
/**
* Returns true if the given session has been split, or false otherwise. This can be
* used as an alternative to getSplitSessionIds() when the result is not needed.
+ *
+ * The txnNumber servers a sanity check to make sure it is the same as the one being
+ * used for splitSession().
*/
bool isSessionSplit(const LogicalSessionId& sessionId, TxnNumber txnNumber) const;
@@ -80,6 +106,9 @@ public:
* Releases all the split sessions of the give top-level session into the session pool
* and stops tracking their mapping.
*
+ * The txnNumber servers a sanity check to make sure it is the same as the one being
+ * used for splitSession().
+ *
* Asserts if the given session is not split.
*/
void releaseSplitSessions(const LogicalSessionId& sessionId, TxnNumber txnNumber);
@@ -92,7 +121,7 @@ private:
InternalSessionPool* _sessionPool;
// A map to track top-level sessions and their splits.
- LogicalSessionIdMap<std::pair<TxnNumber, std::vector<PooledSession>>> _splitSessionMap;
+ LogicalSessionIdMap<std::pair<TxnNumber, std::vector<SplitSessionInfo>>> _splitSessionMap;
};
} // namespace repl
diff --git a/src/mongo/db/repl/split_prepare_session_manager_test.cpp b/src/mongo/db/repl/split_prepare_session_manager_test.cpp
index 8e475c2651a..91f2f06d45b 100644
--- a/src/mongo/db/repl/split_prepare_session_manager_test.cpp
+++ b/src/mongo/db/repl/split_prepare_session_manager_test.cpp
@@ -60,25 +60,38 @@ TEST_F(SplitPrepareSessionManagerTest, SplitSessionsBasic) {
const auto& topLevelSessId1 = makeSystemLogicalSessionId();
const auto& topLevelSessId2 = makeSystemLogicalSessionId();
const TxnNumber txnNumber1(100), txnNumber2(200);
- const int numSplits1 = 1, numSplits2 = 5;
-
- const auto& sessions1 =
- _splitSessManager->splitSession(topLevelSessId1, txnNumber1, numSplits1);
- const auto& sessions2 =
- _splitSessManager->splitSession(topLevelSessId2, txnNumber2, numSplits2);
-
- ASSERT_EQ(numSplits1, sessions1.size());
- ASSERT_EQ(numSplits2, sessions2.size());
-
- std::vector<LogicalSessionId> sessionIds1(sessions1.size()), sessionIds2(sessions2.size());
- auto sessionToId = [](const PooledSession& session) { return session.getSessionId(); };
- std::transform(sessions1.begin(), sessions1.end(), sessionIds1.begin(), sessionToId);
- std::transform(sessions2.begin(), sessions2.end(), sessionIds2.begin(), sessionToId);
+ const std::vector<uint32_t> requesterIds1{4};
+ const std::vector<uint32_t> requesterIds2{1, 3, 5, 7, 9};
+ const size_t numSplits1 = requesterIds1.size();
+ const size_t numSplits2 = requesterIds2.size();
+
+ const auto& sessInfos1 =
+ _splitSessManager->splitSession(topLevelSessId1, txnNumber1, requesterIds1);
+ const auto& sessInfos2 =
+ _splitSessManager->splitSession(topLevelSessId2, txnNumber2, requesterIds2);
+
+ ASSERT_EQ(numSplits1, sessInfos1.size());
+ ASSERT_EQ(numSplits2, sessInfos2.size());
+
+ std::vector<LogicalSessionId> sessionIds1(numSplits1), sessionIds2(numSplits2);
+ std::vector<uint32_t> returnedIds1(numSplits1), returnedIds2(numSplits2);
+ auto sessInfoToSessId = [](const SplitSessionInfo& sessInfo) {
+ return sessInfo.session.getSessionId();
+ };
+ auto sessInfoToReqId = [](const SplitSessionInfo& sessInfo) { return sessInfo.requesterId; };
+ std::transform(sessInfos1.begin(), sessInfos1.end(), sessionIds1.begin(), sessInfoToSessId);
+ std::transform(sessInfos2.begin(), sessInfos2.end(), sessionIds2.begin(), sessInfoToSessId);
+ std::transform(sessInfos1.begin(), sessInfos1.end(), returnedIds1.begin(), sessInfoToReqId);
+ std::transform(sessInfos2.begin(), sessInfos2.end(), returnedIds2.begin(), sessInfoToReqId);
// Make sure the split sessions have unique sessionIds.
ASSERT_EQ(numSplits1, LogicalSessionIdSet(sessionIds1.begin(), sessionIds1.end()).size());
ASSERT_EQ(numSplits2, LogicalSessionIdSet(sessionIds2.begin(), sessionIds2.end()).size());
+ // Make sure the returned requesterIds match the original ones.
+ ASSERT_EQ(requesterIds1, returnedIds1);
+ ASSERT_EQ(requesterIds2, returnedIds2);
+
ASSERT_EQ(numSplits1, _splitSessManager->getSplitSessions(topLevelSessId1, txnNumber1)->size());
ASSERT_EQ(numSplits2, _splitSessManager->getSplitSessions(topLevelSessId2, txnNumber2)->size());
ASSERT_EQ(true, _splitSessManager->isSessionSplit(topLevelSessId1, txnNumber1));
@@ -96,13 +109,15 @@ TEST_F(SplitPrepareSessionManagerTest, SplitSessionsBasic) {
DEATH_TEST_F(SplitPrepareSessionManagerTest, SplitAlreadySplitSessions, "invariant") {
const auto& topLevelSessId = makeSystemLogicalSessionId();
const TxnNumber txnNumber(100);
- const int numSplits = 3;
+ const std::vector<uint32_t> requesterIds{2, 4, 6};
+ const size_t numSplits = requesterIds.size();
- const auto& sessions = _splitSessManager->splitSession(topLevelSessId, txnNumber, numSplits);
- ASSERT_EQ(numSplits, sessions.size());
+ const auto& sessInfos =
+ _splitSessManager->splitSession(topLevelSessId, txnNumber, requesterIds);
+ ASSERT_EQ(numSplits, sessInfos.size());
// Attempting to split an already split top-level session should fail.
- _splitSessManager->splitSession(topLevelSessId, txnNumber, numSplits + 1);
+ _splitSessManager->splitSession(topLevelSessId, txnNumber, requesterIds);
}
DEATH_TEST_F(SplitPrepareSessionManagerTest, ReleaseNonSplitSessions, "invariant") {
@@ -115,10 +130,12 @@ DEATH_TEST_F(SplitPrepareSessionManagerTest, ReleaseNonSplitSessions, "invariant
DEATH_TEST_F(SplitPrepareSessionManagerTest, ChangeTxnNumberAfterSessionSplit, "invariant") {
const auto& topLevelSessId = makeSystemLogicalSessionId();
const TxnNumber txnNumber(100);
- const int numSplits = 3;
+ const std::vector<uint32_t> requesterIds{2, 4, 6};
+ const size_t numSplits = requesterIds.size();
- const auto& sessions = _splitSessManager->splitSession(topLevelSessId, txnNumber, numSplits);
- ASSERT_EQ(numSplits, sessions.size());
+ const auto& sessInfos =
+ _splitSessManager->splitSession(topLevelSessId, txnNumber, requesterIds);
+ ASSERT_EQ(numSplits, sessInfos.size());
// Attempting to release a top-level session with different txnNumber should fail.
_splitSessManager->releaseSplitSessions(topLevelSessId, txnNumber + 1);
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp
index b408894fcff..ca711476df9 100644
--- a/src/mongo/db/transaction/transaction_participant.cpp
+++ b/src/mongo/db/transaction/transaction_participant.cpp
@@ -2020,7 +2020,7 @@ void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary(
const Timestamp& commitTimestamp,
const Timestamp& durableTimestamp) {
- for (const repl::PooledSession& session :
+ for (const auto& sessInfos :
splitPrepareManager->getSplitSessions(userSessionId, userTxnNumber).get()) {
auto splitClientOwned = parentOpCtx->getServiceContext()->makeClient("tempSplitClient");
@@ -2030,6 +2030,7 @@ void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary(
std::unique_ptr<MongoDSessionCatalog::Session> checkedOutSession;
repl::UnreplicatedWritesBlock notReplicated(splitOpCtx.get());
+ const auto& session = sessInfos.session;
splitOpCtx->setLogicalSessionId(session.getSessionId());
splitOpCtx->setTxnNumber(session.getTxnNumber());
splitOpCtx->setInMultiDocumentTransaction();
diff --git a/src/mongo/db/transaction/transaction_participant_test.cpp b/src/mongo/db/transaction/transaction_participant_test.cpp
index f5b53a25d60..60ccf9c4b76 100644
--- a/src/mongo/db/transaction/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction/transaction_participant_test.cpp
@@ -5980,19 +5980,18 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) {
// TxnResources start in the "stashed" state.
userTxnParticipant.unstashTransactionResources(opCtx, "crud ops");
- // Hold the collection lock/datastructure such that it can be released prior to rollback.
+ // Hold the collection lock/data structure such that it can be released prior to rollback.
boost::optional<AutoGetCollection> userColl;
userColl.emplace(opCtx, kNss, LockMode::MODE_IX);
// We split our user session into 2 split sessions.
- const int numSplits = 2;
+ const std::vector<uint32_t> requesterIds{1, 3};
auto* splitPrepareManager =
repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager();
- const std::vector<InternalSessionPool::Session>& splitSessions =
- splitPrepareManager->splitSession(
- opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get(), numSplits);
+ const auto& splitSessInfos = splitPrepareManager->splitSession(
+ opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get(), requesterIds);
// Insert an `_id: 1` document.
- callUnderSplitSession(splitSessions[0], [nullOpDbg](OperationContext* opCtx) {
+ callUnderSplitSession(splitSessInfos[0].session, [nullOpDbg](OperationContext* opCtx) {
AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX);
ASSERT_OK(
collection_internal::insertDocument(opCtx,
@@ -6002,7 +6001,7 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) {
});
// Insert an `_id: 2` document.
- callUnderSplitSession(splitSessions[1], [nullOpDbg](OperationContext* opCtx) {
+ callUnderSplitSession(splitSessInfos[1].session, [nullOpDbg](OperationContext* opCtx) {
AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX);
ASSERT_OK(
collection_internal::insertDocument(opCtx,
@@ -6013,7 +6012,7 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) {
// Update `2` to increment its `value` to 2. This must be done in the same split session as the
// insert.
- callUnderSplitSession(splitSessions[1], [nullOpDbg](OperationContext* opCtx) {
+ callUnderSplitSession(splitSessInfos[1].session, [nullOpDbg](OperationContext* opCtx) {
AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX);
Helpers::update(
opCtx, userColl->ns(), BSON("_id" << 2), BSON("$inc" << BSON("value" << 1)));
@@ -6024,11 +6023,11 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) {
// `UnreplicatedWritesBlock` and explicitly pass in the prepare OpTime.
const Timestamp prepTs = startTs;
const repl::OpTime prepOpTime(prepTs, 1);
- callUnderSplitSession(splitSessions[0], [prepOpTime](OperationContext* opCtx) {
+ callUnderSplitSession(splitSessInfos[0].session, [prepOpTime](OperationContext* opCtx) {
auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant.prepareTransaction(opCtx, prepOpTime);
});
- callUnderSplitSession(splitSessions[1], [prepOpTime](OperationContext* opCtx) {
+ callUnderSplitSession(splitSessInfos[1].session, [prepOpTime](OperationContext* opCtx) {
auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant.prepareTransaction(opCtx, prepOpTime);
});
@@ -6114,11 +6113,11 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) {
// Rather than testing the implementation, we'll assert on the weakest necessary state. A
// split `config.transactions` document may or may not exist. If it exists, it must be
// in the "committed" state.
- for (auto idx = 0; idx < numSplits; ++idx) {
+ for (const auto& sessInfo : splitSessInfos) {
BSONObj splitTxnObj = Helpers::findOneForTesting(
opCtx,
configTransactions.getCollection(),
- BSON("_id.id" << splitSessions[idx].getSessionId().getId()),
+ BSON("_id.id" << sessInfo.session.getSessionId().getId()),
!invariantOnError);
if (!splitTxnObj.isEmpty()) {
assertSessionState(splitTxnObj, DurableTxnStateEnum::kCommitted);
@@ -6180,11 +6179,11 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) {
// Rather than testing the implementation, we'll assert on the weakest necessary state. A split
// `config.transactions` document may or may not exist. If it exists, it must not* be in the
// "prepared" state.
- for (auto idx = 0; idx < numSplits; ++idx) {
+ for (const auto& sessInfo : splitSessInfos) {
BSONObj splitTxnObj =
Helpers::findOneForTesting(opCtx,
configTransactions.getCollection(),
- BSON("_id.id" << splitSessions[idx].getSessionId().getId()),
+ BSON("_id.id" << sessInfo.session.getSessionId().getId()),
!invariantOnError);
if (!splitTxnObj.isEmpty()) {
assertNotInSessionState(splitTxnObj, DurableTxnStateEnum::kPrepared);