diff options
author | Wenbin Zhu <wenbin.zhu@mongodb.com> | 2023-02-09 00:26:29 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2023-02-09 15:11:30 +0000 |
commit | e5ee746f9e459686d172ae99204f484e85652d49 (patch) | |
tree | e1b5b682ec18e74fbf1e640e75557d448373fbe3 | |
parent | 1d79792d701ab899165a7c56108633bbeac3c924 (diff) | |
download | mongo-e5ee746f9e459686d172ae99204f484e85652d49.tar.gz |
SERVER-72762 Split commit and abort entries during secondary oplog application.
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl_test.cpp | 167 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_utils.cpp | 83 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_applier_utils.h | 81 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry.h | 8 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog_entry_or_grouped_inserts.h | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/split_prepare_session_manager.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/repl/split_prepare_session_manager.h | 39 | ||||
-rw-r--r-- | src/mongo/db/repl/split_prepare_session_manager_test.cpp | 59 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_participant.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/transaction/transaction_participant_test.cpp | 27 |
12 files changed, 405 insertions, 125 deletions
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 35fa85e2ced..885d9f923cd 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -116,8 +116,8 @@ void _addOplogChainOpsToWriterVectors(OperationContext* opCtx, std::vector<OplogEntry*>* partialTxnList, std::vector<std::vector<OplogEntry>>* derivedOps, OplogEntry* op, - CachedCollectionProperties* collPropertiesCache, - std::vector<std::vector<ApplierOperation>>* writerVectors) { + std::vector<std::vector<ApplierOperation>>* writerVectors, + CachedCollectionProperties* collPropertiesCache) { auto [txnOps, shouldSerialize] = readTransactionOperationsFromOplogChainAndCheckForCommands(opCtx, *op, *partialTxnList); derivedOps->emplace_back(std::move(txnOps)); @@ -695,9 +695,8 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( std::vector<std::vector<OplogEntry>>* derivedOps, SessionUpdateTracker* sessionUpdateTracker) noexcept { - // Caches partial transaction operations. Each map entry - // contains a cumulative list of operations seen in this batch so - // far. + // Caches partial transaction operations. Each map entry contains a cumulative list + // of operations seen in this batch so far. stdx::unordered_map<OpTime, std::vector<OplogEntry*>, OpTime::Hasher> partialTxnOps; // Provided to _addOplogChainOpsToWriterVectors() when 'partialTxnOps' does not have any entries @@ -791,7 +790,7 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( // Flush partialTxnList operations for current transaction. auto* partialTxnList = getPartialTxnList(op); _addOplogChainOpsToWriterVectors( - opCtx, partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors); + opCtx, partialTxnList, derivedOps, &op, writerVectors, &collPropertiesCache); invariant(partialTxnList->empty(), op.toStringForLogging()); } else { // The applyOps entry was not generated as part of a transaction. @@ -811,18 +810,21 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( if (repl::feature_flags::gApplyPreparedTxnsInParallel.isEnabled( serverGlobalParams.featureCompatibility)) { - // Prepare entries in secondary mode do not come in their own batch, we will extract - // applyOps operations and fill writers with the extracted operations. + // Prepare entries in secondary mode do not come in their own batch, extract applyOps + // operations and fill writers with the extracted operations. if (op.shouldPrepare() && (getOptions().mode == OplogApplication::Mode::kSecondary)) { auto* partialTxnList = getPartialTxnList(op); _addOplogChainOpsToWriterVectors( - opCtx, partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors); - invariant(partialTxnList->empty(), op.toStringForLogging()); + opCtx, partialTxnList, derivedOps, &op, writerVectors, &collPropertiesCache); continue; } - if (op.isPreparedCommit() && + + // Fill the writers with commit or abort operation. Depending on whether the operation + // refers to a split prepare, it might also be split into multiple ops. + if ((op.isPreparedCommit() || op.isPreparedAbort()) && (getOptions().mode == OplogApplication::Mode::kSecondary)) { - // TODO (SERVER-72762): split commit oplog entries. + OplogApplierUtils::addDerivedCommitsOrAborts( + opCtx, &op, writerVectors, &collPropertiesCache); continue; } } @@ -833,7 +835,7 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( if (op.isPreparedCommit() && (getOptions().mode == OplogApplication::Mode::kInitialSync)) { auto* partialTxnList = getPartialTxnList(op); _addOplogChainOpsToWriterVectors( - opCtx, partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors); + opCtx, partialTxnList, derivedOps, &op, writerVectors, &collPropertiesCache); invariant(partialTxnList->empty(), op.toStringForLogging()); continue; } diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp index 42e978295a7..ed68d116056 100644 --- a/src/mongo/db/repl/oplog_applier_impl_test.cpp +++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp @@ -4824,18 +4824,39 @@ protected: _writerPool.get()); } - auto makePrepareOplogForCrudBatch(const std::vector<BSONObj>& batch, - const OpTime opTime, - const LogicalSessionId& lsid, - TxnNumber txnNumber) { + auto makePrepareOplogEntry(const std::vector<BSONObj>& docs, + const OpTime opTime, + const LogicalSessionId& lsid, + TxnNumber txnNumber) { BSONArrayBuilder arrBuilder; - arrBuilder.append(batch.begin(), batch.end()); + arrBuilder.append(docs.begin(), docs.end()); auto command = BSON("applyOps" << arrBuilder.arr() << "prepare" << true); return makeCommandOplogEntryWithSessionInfoAndStmtIds( opTime, _cmdNss, command, lsid, TxnNumber(txnNumber), {StmtId(0)}, OpTime()); } + auto makeCommitOplogEntry(const OpTime opTime, + const Timestamp commitTs, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const OpTime prevOpTime) { + auto command = BSON("commitTransaction" << 1 << "commitTimestamp" << commitTs); + + return makeCommandOplogEntryWithSessionInfoAndStmtIds( + opTime, _cmdNss, command, lsid, TxnNumber(txnNumber), {StmtId(0)}, prevOpTime); + } + + auto makeAbortOplogEntry(const OpTime opTime, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + const OpTime prevOpTime) { + auto command = BSON("abortTransaction" << 1); + + return makeCommandOplogEntryWithSessionInfoAndStmtIds( + opTime, _cmdNss, command, lsid, TxnNumber(txnNumber), {StmtId(0)}, prevOpTime); + } + int filterConfigTransactionsEntryFromWriterVectors(WriterVectors& writerVectors) { int txnTableOps = 0; for (auto& vector : writerVectors) { @@ -4870,7 +4891,7 @@ TEST_F(PreparedTxnSplitTest, MultiplePrepareTxnsInSameBatch) { // changes. const int kNumEntries = _writerPool->getStats().options.maxThreads * 1000; - std::vector<OplogEntry> ops; + std::vector<OplogEntry> prepareOps; std::vector<BSONObj> cruds1; std::vector<BSONObj> cruds2; cruds1.reserve(kNumEntries); @@ -4886,28 +4907,29 @@ TEST_F(PreparedTxnSplitTest, MultiplePrepareTxnsInSameBatch) { << BSON("_id" << i + kNumEntries))); } - ops.push_back(makePrepareOplogForCrudBatch(cruds1, {Timestamp(1, 1), 1}, _lsid1, _txnNum1)); - ops.push_back(makePrepareOplogForCrudBatch(cruds2, {Timestamp(1, 2), 1}, _lsid2, _txnNum2)); + prepareOps.push_back(makePrepareOplogEntry(cruds1, {Timestamp(1, 1), 1}, _lsid1, _txnNum1)); + prepareOps.push_back(makePrepareOplogEntry(cruds2, {Timestamp(1, 2), 1}, _lsid2, _txnNum2)); - WriterVectors writerVectors(_writerPool->getStats().options.maxThreads); - std::vector<std::vector<OplogEntry>> derivedOps; - _applier->fillWriterVectors_forTest(_opCtx.get(), &ops, &writerVectors, &derivedOps); + WriterVectors prepareWriterVectors(_writerPool->getStats().options.maxThreads); + std::vector<std::vector<OplogEntry>> derivedPrepareOps; + _applier->fillWriterVectors_forTest( + _opCtx.get(), &prepareOps, &prepareWriterVectors, &derivedPrepareOps); // Verify the config.transactions collection got one entry for each prepared transaction. - int txnTableOps = filterConfigTransactionsEntryFromWriterVectors(writerVectors); + int txnTableOps = filterConfigTransactionsEntryFromWriterVectors(prepareWriterVectors); ASSERT_EQ(2, txnTableOps); // Verify each writer has been assigned operations for both prepared transactions. - for (auto& writer : writerVectors) { + for (auto& writer : prepareWriterVectors) { ASSERT_EQ(2, writer.size()); - ASSERT_EQ(ApplicationInstruction::applySplitPrepareOps, writer[0].instruction); + ASSERT_EQ(ApplicationInstruction::applySplitPrepareOp, writer[0].instruction); ASSERT_TRUE(writer[0]->shouldPrepare()); ASSERT_NE(boost::none, writer[0].subSession); ASSERT_NE(boost::none, writer[0].splitPrepareOps); ASSERT_FALSE(writer[0].splitPrepareOps->empty()); - ASSERT_EQ(ApplicationInstruction::applySplitPrepareOps, writer[1].instruction); + ASSERT_EQ(ApplicationInstruction::applySplitPrepareOp, writer[1].instruction); ASSERT_TRUE(writer[1]->shouldPrepare()); ASSERT_NE(boost::none, writer[1].subSession); ASSERT_NE(boost::none, writer[1].splitPrepareOps); @@ -4915,35 +4937,114 @@ TEST_F(PreparedTxnSplitTest, MultiplePrepareTxnsInSameBatch) { ASSERT_NE(writer[0].subSession->getSessionId(), writer[1].subSession->getSessionId()); } + + // Test that applying a commitTransaction or abortTransaction entry in the next batch will + // correctly split the entry and add them into those writer vectors that previously got + // assigned the prepare entry. + std::vector<OplogEntry> commitOps; + commitOps.push_back(makeCommitOplogEntry( + {Timestamp(3, 1), 1}, Timestamp(2, 1), _lsid1, _txnNum1, prepareOps[0].getOpTime())); + commitOps.push_back( + makeAbortOplogEntry({Timestamp(3, 2), 1}, _lsid2, _txnNum2, prepareOps[1].getOpTime())); + + WriterVectors commitWriterVectors(_writerPool->getStats().options.maxThreads); + std::vector<std::vector<OplogEntry>> derivedCommitOps; + _applier->fillWriterVectors_forTest( + _opCtx.get(), &commitOps, &commitWriterVectors, &derivedCommitOps); + + // Verify the config.transactions collection got one entry for each commit/abort op. + txnTableOps = filterConfigTransactionsEntryFromWriterVectors(commitWriterVectors); + ASSERT_EQ(2, txnTableOps); + + // Verify each writer has been assigned a split commit and abort op. + for (size_t i = 0; i < commitWriterVectors.size(); ++i) { + auto& commitWriter = commitWriterVectors[i]; + auto& prepareWriter = prepareWriterVectors[i]; + + ASSERT_EQ(2, commitWriter.size()); + ASSERT_EQ(2, prepareWriter.size()); + + ASSERT_EQ(ApplicationInstruction::applySplitCommitOrAbortOp, commitWriter[0].instruction); + ASSERT_TRUE(commitWriter[0]->isPreparedCommit()); + ASSERT_EQ(prepareWriter[0].subSession->getSessionId(), + commitWriter[0].subSession->getSessionId()); + ASSERT_EQ(boost::none, commitWriter[0].splitPrepareOps); + + ASSERT_EQ(ApplicationInstruction::applySplitCommitOrAbortOp, commitWriter[1].instruction); + ASSERT_EQ(prepareWriter[1].subSession->getSessionId(), + commitWriter[1].subSession->getSessionId()); + ASSERT_NE(boost::none, commitWriter[1].subSession); + ASSERT_EQ(boost::none, commitWriter[1].splitPrepareOps); + + ASSERT_NE(commitWriter[0].subSession->getSessionId(), + commitWriter[1].subSession->getSessionId()); + } } TEST_F(PreparedTxnSplitTest, SingleEmptyPrepareTransaction) { RAIIServerParameterControllerForTest controller("featureFlagApplyPreparedTxnsInParallel", true); - std::vector<OplogEntry> ops; - ops.push_back(makePrepareOplogForCrudBatch({}, {Timestamp(1, 1), 1}, _lsid1, _txnNum1)); + std::vector<OplogEntry> prepareOps; + prepareOps.push_back(makePrepareOplogEntry({}, {Timestamp(1, 1), 1}, _lsid1, _txnNum1)); - WriterVectors writerVectors(_writerPool->getStats().options.maxThreads); - std::vector<std::vector<OplogEntry>> derivedOps; - _applier->fillWriterVectors_forTest(_opCtx.get(), &ops, &writerVectors, &derivedOps); + WriterVectors prepareWriterVectors(_writerPool->getStats().options.maxThreads); + std::vector<std::vector<OplogEntry>> derivedPrepareOps; + _applier->fillWriterVectors_forTest( + _opCtx.get(), &prepareOps, &prepareWriterVectors, &derivedPrepareOps); // Verify the config.transactions collection got an entry for the empty prepared transaction. - int txnTableOps = filterConfigTransactionsEntryFromWriterVectors(writerVectors); + int txnTableOps = filterConfigTransactionsEntryFromWriterVectors(prepareWriterVectors); ASSERT_EQ(1, txnTableOps); - // Verify exactly writer has been assigned operation for the empty prepared transaction. - int count = std::count_if(writerVectors.begin(), writerVectors.end(), [](auto& writer) { - if (writer.size() != 1) { - return false; + // Verify exactly one writer has been assigned operation for the empty prepared transaction. + boost::optional<uint32_t> writerId; + for (size_t i = 0; i < prepareWriterVectors.size(); ++i) { + auto& writer = prepareWriterVectors[i]; + ASSERT_LTE(writer.size(), 1); + if (writer.size() == 1) { + ASSERT_EQ(boost::none, writerId); + writerId = i; + ASSERT_EQ(ApplicationInstruction::applySplitPrepareOp, writer[0].instruction); + ASSERT_TRUE(writer[0]->shouldPrepare()); + ASSERT_NE(boost::none, writer[0].subSession); + ASSERT_NE(boost::none, writer[0].splitPrepareOps); + ASSERT_TRUE(writer[0].splitPrepareOps->empty()); } - ASSERT_EQ(ApplicationInstruction::applySplitPrepareOps, writer[0].instruction); - ASSERT_TRUE(writer[0]->shouldPrepare()); - ASSERT_NE(boost::none, writer[0].subSession); - ASSERT_NE(boost::none, writer[0].splitPrepareOps); - ASSERT_TRUE(writer[0].splitPrepareOps->empty()); - return true; - }); - ASSERT_EQ(1, count); + } + + // Test that applying a commitTransaction in the next batch will correctly split the entry + // and add it into those writer vectors that previously got assigned the prepare entry. + std::vector<OplogEntry> commitOps; + commitOps.push_back(makeCommitOplogEntry( + {Timestamp(3, 1), 1}, Timestamp(2, 1), _lsid1, _txnNum1, prepareOps[0].getOpTime())); + + WriterVectors commitWriterVectors(_writerPool->getStats().options.maxThreads); + std::vector<std::vector<OplogEntry>> derivedCommitOps; + _applier->fillWriterVectors_forTest( + _opCtx.get(), &commitOps, &commitWriterVectors, &derivedCommitOps); + + // Verify the config.transactions collection got an entry for the commit op. + txnTableOps = filterConfigTransactionsEntryFromWriterVectors(commitWriterVectors); + ASSERT_EQ(1, txnTableOps); + + // Verify exactly one writer has been assigned a split commit op. + for (size_t i = 0; i < commitWriterVectors.size(); ++i) { + auto& commitwriter = commitWriterVectors[i]; + auto& prepareWriter = prepareWriterVectors[i]; + ASSERT_LTE(commitwriter.size(), 1); + ASSERT_LTE(prepareWriter.size(), 1); + ASSERT_EQ(prepareWriter.size(), commitwriter.size()); + + if (commitwriter.size() == 1) { + ASSERT_EQ(writerId.get(), i); + ASSERT_EQ(ApplicationInstruction::applySplitCommitOrAbortOp, + commitwriter[0].instruction); + ASSERT_TRUE(commitwriter[0]->isPreparedCommit()); + ASSERT_EQ(prepareWriter[0].subSession->getSessionId(), + commitwriter[0].subSession->getSessionId()); + ASSERT_EQ(boost::none, commitwriter[0].splitPrepareOps); + } + } } class GlobalIndexTest : public OplogApplierImplTest { diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp index 2f8bed62443..9a7fddb9503 100644 --- a/src/mongo/db/repl/oplog_applier_utils.cpp +++ b/src/mongo/db/repl/oplog_applier_utils.cpp @@ -207,47 +207,84 @@ void OplogApplierUtils::addDerivedPrepares( std::vector<std::vector<ApplierOperation>>* writerVectors, CachedCollectionProperties* collPropertiesCache) { - uint32_t bufSplits = 0; - std::vector<std::vector<const OplogEntry*>> bufWriterVectors(writerVectors->size()); - - // Add the ops in the prepared transaction to the buffered writer vectors. - for (auto&& op : *derivedOps) { - auto writerId = addToWriterVector(opCtx, &op, &bufWriterVectors, collPropertiesCache); - bufSplits += bufWriterVectors[writerId].size() == 1; - } - - // Create the split sessions and track them with the the session of this prepare entry. - uint32_t realSplits = std::max<uint32_t>(bufSplits, 1); + // Get the SplitPrepareSessionManager to be used to create split sessions. auto splitSessManager = ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager(); - const auto& splitSessions = splitSessManager->splitSession( - *prepareOp->getSessionId(), *prepareOp->getTxnNumber(), realSplits); - invariant(splitSessions.size() == realSplits); + auto splitSessFunc = [=](const std::vector<uint32_t>& writerIds) -> const auto& { + const auto& sessions = splitSessManager->splitSession( + *prepareOp->getSessionId(), *prepareOp->getTxnNumber(), writerIds); + invariant(sessions.size() == writerIds.size()); + return sessions; + }; - // For empty (read-only) prepares, the namespace of the prepare oplog entry (admin.$cmd) - // will be used to decide which writer vector to add to. - if (!bufSplits) { + // For empty (read-only) prepares, we use the namespace of the original prepare oplog entry + // (admin.$cmd) to decide which writer thread to apply it, and assigned it a split session. + // + // The reason that we also split an empty prepare instead of treating it as some standalone + // prepare op (as the prepares in initial sync or recovery mode) is so that we can keep a + // logical invariant that all prepares in secondary mode are split, and thus we can apply + // empty and non-empty prepares in the same way. + if (derivedOps->empty()) { auto writerId = getWriterId(opCtx, prepareOp, collPropertiesCache, writerVectors->size()); + const auto& sessionInfos = splitSessFunc({writerId}); (*writerVectors)[writerId].emplace_back(prepareOp, - ApplicationInstruction::applySplitPrepareOps, - splitSessions[0], + ApplicationInstruction::applySplitPrepareOp, + sessionInfos[0].session, std::vector<const OplogEntry*>{}); return; } - // For each writer thread that has been assigned ops for this transaction, acquire a - // split session and transfer the ops to the real writer vector. + // For non-empty prepares, the namespace of each derived op in the transaction is used to + // decide which writer thread to apply it. We first add all the derived ops to a buffer + // writer vector in order to get all the writer threads needed to apply this transaction. + // We then acquire that number of split sessions and assign each writer thread a unique + // split session when moving the ops to the real writer vector. + std::set<uint32_t> writerIds; + std::vector<std::vector<const OplogEntry*>> bufWriterVectors(writerVectors->size()); + for (auto&& op : *derivedOps) { + auto writerId = addToWriterVector(opCtx, &op, &bufWriterVectors, collPropertiesCache); + writerIds.emplace(writerId); + } + + const auto& sessionInfos = splitSessFunc({writerIds.begin(), writerIds.end()}); for (size_t i = 0, j = 0; i < bufWriterVectors.size(); ++i) { auto& bufWriter = bufWriterVectors[i]; auto& realWriter = (*writerVectors)[i]; if (!bufWriter.empty()) { realWriter.emplace_back(prepareOp, - ApplicationInstruction::applySplitPrepareOps, - splitSessions[j++], + ApplicationInstruction::applySplitPrepareOp, + sessionInfos[j++].session, std::move(bufWriter)); } } } +void OplogApplierUtils::addDerivedCommitsOrAborts( + OperationContext* opCtx, + OplogEntry* commitOrAbortOp, + std::vector<std::vector<ApplierOperation>>* writerVectors, + CachedCollectionProperties* collPropertiesCache) { + + auto splitSessManager = ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager(); + const auto& sessionInfos = splitSessManager->getSplitSessions(*commitOrAbortOp->getSessionId(), + *commitOrAbortOp->getTxnNumber()); + + // When this commit refers to a non-split prepare, it means the transaction was + // prepared when the node was primary or during inital sync/recovery. In this + // case we do not split the commit and just add it as-is to the writer vector. + if (!sessionInfos.has_value()) { + addToWriterVector(opCtx, commitOrAbortOp, writerVectors, collPropertiesCache); + return; + } + + // When this commit refers to a split prepare, we split the commit and add them + // to the writers that have been assigned split prepare ops. + for (const auto& sessInfo : *sessionInfos) { + auto& writer = (*writerVectors)[sessInfo.requesterId]; + writer.emplace_back( + commitOrAbortOp, ApplicationInstruction::applySplitCommitOrAbortOp, sessInfo.session); + } +} + NamespaceString OplogApplierUtils::parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { auto optionalUuid = oplogEntry.getUuid(); diff --git a/src/mongo/db/repl/oplog_applier_utils.h b/src/mongo/db/repl/oplog_applier_utils.h index 19bf846c45f..b2658ddc2b5 100644 --- a/src/mongo/db/repl/oplog_applier_utils.h +++ b/src/mongo/db/repl/oplog_applier_utils.h @@ -115,6 +115,49 @@ public: /** * Adds a set of derived prepared transaction operations to writerVectors. + * + * The prepareOp and derivedOps are inputs that we use to generate ApplierOperation's to be + * added to the writerVectors. The derivedOps contains all the CRUD ops inside the applyOps + * part of the prepareOp. When this function finishes the writerVectors may look like this: + * + * ========================== for non-empty prepared transaction ========================== + * writer vector 1: [ ] + * writer vector 2: [ + * ApplierOperation{ + * op: prepareOp, + * instruction: applySplitPrepareOp, + * subSession: <split_session_id_1>, + * splitPrepareOps: [ crud_op_1, crud_op_3 ], + * }, + * ] + * writer vector 3: [ + * // This op should already exist in the writerVector prior to this function call. + * ApplierOperation{ op: <config.transaction_update_op>, instruction: applyOplogEntry }, + * ] + * writer vector 4: [ + * ApplierOperation{ + * op: prepareOp, + * instruction: applySplitPrepareOp, + * subSession: <split_session_id_2>, + * splitPrepareOps: [ crud_op_2, crud_op_4 ], + * }, + * ] + * ============================ for empty prepared transaction ============================ + * writer vector 1: [ ] + * writer vector 2: [ + * // This op should already exist in the writerVector prior to this function call. + * ApplierOperation{ op: <config.transaction_update_op>, instruction: applyOplogEntry }, + * ] + * writer vector 3: [ ] + * writer vector 4: [ + * // The splitPrepareOps list is made empty, which we can still correctly apply. + * ApplierOperation{ + * op: <original_prepare_op>, + * instruction: applySplitPrepareOp, + * subSession: <split_session_id_1>, + * splitPrepareOps: [ ], + * }, + * ] */ static void addDerivedPrepares(OperationContext* opCtx, OplogEntry* prepareOp, @@ -123,9 +166,43 @@ public: CachedCollectionProperties* collPropertiesCache); /** + * Adds commit or abort transaction operations to the writerVectors. + * + * The commitOrAbortOp is the input that we use to generate ApplierOperation's to be added + * to those writerVectors that previously got assigned the prepare ops. When this function + * finishes the writerVectors may look like this: + * + * writer vector 1: [ ] + * writer vector 2: [ + * ApplierOperation{ + * op: commitOrAbortOp, + * instruction: applySplitCommitOrAbortOp, + * subSession: <split_session_id_1>, + * splitPrepareOps: [ ], + * }, + * ] + * writer vector 3: [ + * // This op should already exist in the writerVector prior to this function call. + * ApplierOperation{ op: <config.transaction_update_op>, instruction: applyOplogEntry }, + * ] + * writer vector 4: [ + * ApplierOperation{ + * op: commitOrAbortOp, + * instruction: applySplitCommitOrAbortOp, + * subSession: <split_session_id_2>, + * splitPrepareOps: [ ], + * }, + * ] + */ + static void addDerivedCommitsOrAborts(OperationContext* opCtx, + OplogEntry* commitOrAbortOp, + std::vector<std::vector<ApplierOperation>>* writerVectors, + CachedCollectionProperties* collPropertiesCache); + + /** * Returns the namespace string for this oplogEntry; if it has a UUID it looks up the - * corresponding namespace and returns it, otherwise it returns the oplog entry 'nss'. If there - * is a UUID and no namespace with that ID is found, throws NamespaceNotFound. + * corresponding namespace and returns it, otherwise it returns the oplog entry 'nss'. If + * there is a UUID and no namespace with that ID is found, throws NamespaceNotFound. */ static NamespaceString parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry); diff --git a/src/mongo/db/repl/oplog_entry.cpp b/src/mongo/db/repl/oplog_entry.cpp index 4d6fae466cb..33dfc7b514c 100644 --- a/src/mongo/db/repl/oplog_entry.cpp +++ b/src/mongo/db/repl/oplog_entry.cpp @@ -791,6 +791,10 @@ bool OplogEntry::isPreparedCommit() const { return _entry.isPreparedCommit(); } +bool OplogEntry::isPreparedAbort() const { + return _entry.isPreparedAbort(); +} + bool OplogEntry::isTerminalApplyOps() const { return _entry.isTerminalApplyOps(); } diff --git a/src/mongo/db/repl/oplog_entry.h b/src/mongo/db/repl/oplog_entry.h index eede3f30236..becd58a7aac 100644 --- a/src/mongo/db/repl/oplog_entry.h +++ b/src/mongo/db/repl/oplog_entry.h @@ -538,6 +538,13 @@ public: } /** + * Returns if this is a prepared 'abortTransaction' oplog entry. + */ + bool isPreparedAbort() const { + return getCommandType() == DurableOplogEntry::CommandType::kAbortTransaction; + } + + /** * Returns whether the oplog entry represents an applyOps which is a self-contained atomic * operation, or the last applyOps of an unprepared transaction, as opposed to part of a * prepared transaction or a non-final applyOps in a transaction. @@ -734,6 +741,7 @@ public: bool isPartialTransaction() const; bool isEndOfLargeTransaction() const; bool isPreparedCommit() const; + bool isPreparedAbort() const; bool isTerminalApplyOps() const; bool isSingleOplogEntryTransaction() const; bool isSingleOplogEntryTransactionWithCommand() const; diff --git a/src/mongo/db/repl/oplog_entry_or_grouped_inserts.h b/src/mongo/db/repl/oplog_entry_or_grouped_inserts.h index 669e70a3c0c..20f352d6d4b 100644 --- a/src/mongo/db/repl/oplog_entry_or_grouped_inserts.h +++ b/src/mongo/db/repl/oplog_entry_or_grouped_inserts.h @@ -38,7 +38,8 @@ namespace repl { enum class ApplicationInstruction { applyOplogEntry, - applySplitPrepareOps, + applySplitPrepareOp, + applySplitCommitOrAbortOp, }; struct ApplierOperation { diff --git a/src/mongo/db/repl/split_prepare_session_manager.cpp b/src/mongo/db/repl/split_prepare_session_manager.cpp index 7734defb25c..40fa56416f6 100644 --- a/src/mongo/db/repl/split_prepare_session_manager.cpp +++ b/src/mongo/db/repl/split_prepare_session_manager.cpp @@ -35,28 +35,32 @@ namespace repl { SplitPrepareSessionManager::SplitPrepareSessionManager(InternalSessionPool* sessionPool) : _sessionPool(sessionPool) {} -const std::vector<PooledSession>& SplitPrepareSessionManager::splitSession( - const LogicalSessionId& sessionId, TxnNumber txnNumber, uint32_t numSplits) { +const std::vector<SplitSessionInfo>& SplitPrepareSessionManager::splitSession( + const LogicalSessionId& sessionId, + TxnNumber txnNumber, + const std::vector<uint32_t>& requesterIds) { + + auto numSplits = requesterIds.size(); invariant(numSplits > 0); stdx::lock_guard<Latch> lk(_mutex); auto [it, succ] = - _splitSessionMap.try_emplace(sessionId, txnNumber, std::vector<PooledSession>()); + _splitSessionMap.try_emplace(sessionId, txnNumber, std::vector<SplitSessionInfo>()); // The session must not be split before. invariant(succ); - auto& sessions = it->second.second; - sessions.reserve(numSplits); + auto& sessionInfos = it->second.second; + sessionInfos.reserve(numSplits); - for (uint32_t i = 0; i < numSplits; ++i) { - sessions.push_back(_sessionPool->acquireSystemSession()); + for (auto reqId : requesterIds) { + sessionInfos.emplace_back(_sessionPool->acquireSystemSession(), reqId); } - return sessions; + return sessionInfos; } -boost::optional<const std::vector<PooledSession>&> SplitPrepareSessionManager::getSplitSessions( +boost::optional<const std::vector<SplitSessionInfo>&> SplitPrepareSessionManager::getSplitSessions( const LogicalSessionId& sessionId, TxnNumber txnNumber) const { stdx::lock_guard<Latch> lk(_mutex); @@ -98,9 +102,9 @@ void SplitPrepareSessionManager::releaseSplitSessions(const LogicalSessionId& se // The txnNumber must not change after the session was split. invariant(txnNumber == it->second.first); - auto& sessions = it->second.second; - for (const auto& sess : sessions) { - _sessionPool->release(sess); + auto& sessionInfos = it->second.second; + for (const auto& sessInfo : sessionInfos) { + _sessionPool->release(sessInfo.session); } _splitSessionMap.erase(it); diff --git a/src/mongo/db/repl/split_prepare_session_manager.h b/src/mongo/db/repl/split_prepare_session_manager.h index 6a53763e73b..7732ff8853e 100644 --- a/src/mongo/db/repl/split_prepare_session_manager.h +++ b/src/mongo/db/repl/split_prepare_session_manager.h @@ -39,6 +39,19 @@ namespace repl { using PooledSession = InternalSessionPool::Session; /** + * This struct contains information for a split session, including the split + * session itself and the id of the requester who wants to acquire the split + * session, which is usually a writer thread from the oplog applier. + */ +struct SplitSessionInfo { + PooledSession session; + uint32_t requesterId; + + SplitSessionInfo(PooledSession&& sess, uint32_t reqId) + : session(std::move(sess)), requesterId(reqId) {} +}; + +/** * This class manages the sessions for split prepared transactions. * * Prepared transactions are split and applied in parallel on secondaries, and this class is @@ -57,22 +70,35 @@ public: /** * Creates split sessions for the given top-level session and track the mapping. * + * The txnNumber is the txnNumber of the top-level session that needs to be split. It + * servers as a sanity check: onces a session is split, it can not be split again for + * another txnNumber until it releases the existing split sessions. + * + * The requesterIds is a sorted list of requesters (usually writer threads from the + * oplog applier), each of whom wants to acquire a split session. + * * Asserts if the given session is already split. */ - const std::vector<PooledSession>& splitSession(const LogicalSessionId& sessionId, - TxnNumber txnNumber, - uint32_t numSplits); + const std::vector<SplitSessionInfo>& splitSession(const LogicalSessionId& sessionId, + TxnNumber txnNumber, + const std::vector<uint32_t>& requesterIds); /** * Returns a vector of split sessions for the given top-level session, or nothing if * the given session has not been split. + * + * The txnNumber servers a sanity check to make sure it is the same as the one being + * used for splitSession(). */ - boost::optional<const std::vector<PooledSession>&> getSplitSessions( + boost::optional<const std::vector<SplitSessionInfo>&> getSplitSessions( const LogicalSessionId& sessionId, TxnNumber txnNumber) const; /** * Returns true if the given session has been split, or false otherwise. This can be * used as an alternative to getSplitSessionIds() when the result is not needed. + * + * The txnNumber servers a sanity check to make sure it is the same as the one being + * used for splitSession(). */ bool isSessionSplit(const LogicalSessionId& sessionId, TxnNumber txnNumber) const; @@ -80,6 +106,9 @@ public: * Releases all the split sessions of the give top-level session into the session pool * and stops tracking their mapping. * + * The txnNumber servers a sanity check to make sure it is the same as the one being + * used for splitSession(). + * * Asserts if the given session is not split. */ void releaseSplitSessions(const LogicalSessionId& sessionId, TxnNumber txnNumber); @@ -92,7 +121,7 @@ private: InternalSessionPool* _sessionPool; // A map to track top-level sessions and their splits. - LogicalSessionIdMap<std::pair<TxnNumber, std::vector<PooledSession>>> _splitSessionMap; + LogicalSessionIdMap<std::pair<TxnNumber, std::vector<SplitSessionInfo>>> _splitSessionMap; }; } // namespace repl diff --git a/src/mongo/db/repl/split_prepare_session_manager_test.cpp b/src/mongo/db/repl/split_prepare_session_manager_test.cpp index 8e475c2651a..91f2f06d45b 100644 --- a/src/mongo/db/repl/split_prepare_session_manager_test.cpp +++ b/src/mongo/db/repl/split_prepare_session_manager_test.cpp @@ -60,25 +60,38 @@ TEST_F(SplitPrepareSessionManagerTest, SplitSessionsBasic) { const auto& topLevelSessId1 = makeSystemLogicalSessionId(); const auto& topLevelSessId2 = makeSystemLogicalSessionId(); const TxnNumber txnNumber1(100), txnNumber2(200); - const int numSplits1 = 1, numSplits2 = 5; - - const auto& sessions1 = - _splitSessManager->splitSession(topLevelSessId1, txnNumber1, numSplits1); - const auto& sessions2 = - _splitSessManager->splitSession(topLevelSessId2, txnNumber2, numSplits2); - - ASSERT_EQ(numSplits1, sessions1.size()); - ASSERT_EQ(numSplits2, sessions2.size()); - - std::vector<LogicalSessionId> sessionIds1(sessions1.size()), sessionIds2(sessions2.size()); - auto sessionToId = [](const PooledSession& session) { return session.getSessionId(); }; - std::transform(sessions1.begin(), sessions1.end(), sessionIds1.begin(), sessionToId); - std::transform(sessions2.begin(), sessions2.end(), sessionIds2.begin(), sessionToId); + const std::vector<uint32_t> requesterIds1{4}; + const std::vector<uint32_t> requesterIds2{1, 3, 5, 7, 9}; + const size_t numSplits1 = requesterIds1.size(); + const size_t numSplits2 = requesterIds2.size(); + + const auto& sessInfos1 = + _splitSessManager->splitSession(topLevelSessId1, txnNumber1, requesterIds1); + const auto& sessInfos2 = + _splitSessManager->splitSession(topLevelSessId2, txnNumber2, requesterIds2); + + ASSERT_EQ(numSplits1, sessInfos1.size()); + ASSERT_EQ(numSplits2, sessInfos2.size()); + + std::vector<LogicalSessionId> sessionIds1(numSplits1), sessionIds2(numSplits2); + std::vector<uint32_t> returnedIds1(numSplits1), returnedIds2(numSplits2); + auto sessInfoToSessId = [](const SplitSessionInfo& sessInfo) { + return sessInfo.session.getSessionId(); + }; + auto sessInfoToReqId = [](const SplitSessionInfo& sessInfo) { return sessInfo.requesterId; }; + std::transform(sessInfos1.begin(), sessInfos1.end(), sessionIds1.begin(), sessInfoToSessId); + std::transform(sessInfos2.begin(), sessInfos2.end(), sessionIds2.begin(), sessInfoToSessId); + std::transform(sessInfos1.begin(), sessInfos1.end(), returnedIds1.begin(), sessInfoToReqId); + std::transform(sessInfos2.begin(), sessInfos2.end(), returnedIds2.begin(), sessInfoToReqId); // Make sure the split sessions have unique sessionIds. ASSERT_EQ(numSplits1, LogicalSessionIdSet(sessionIds1.begin(), sessionIds1.end()).size()); ASSERT_EQ(numSplits2, LogicalSessionIdSet(sessionIds2.begin(), sessionIds2.end()).size()); + // Make sure the returned requesterIds match the original ones. + ASSERT_EQ(requesterIds1, returnedIds1); + ASSERT_EQ(requesterIds2, returnedIds2); + ASSERT_EQ(numSplits1, _splitSessManager->getSplitSessions(topLevelSessId1, txnNumber1)->size()); ASSERT_EQ(numSplits2, _splitSessManager->getSplitSessions(topLevelSessId2, txnNumber2)->size()); ASSERT_EQ(true, _splitSessManager->isSessionSplit(topLevelSessId1, txnNumber1)); @@ -96,13 +109,15 @@ TEST_F(SplitPrepareSessionManagerTest, SplitSessionsBasic) { DEATH_TEST_F(SplitPrepareSessionManagerTest, SplitAlreadySplitSessions, "invariant") { const auto& topLevelSessId = makeSystemLogicalSessionId(); const TxnNumber txnNumber(100); - const int numSplits = 3; + const std::vector<uint32_t> requesterIds{2, 4, 6}; + const size_t numSplits = requesterIds.size(); - const auto& sessions = _splitSessManager->splitSession(topLevelSessId, txnNumber, numSplits); - ASSERT_EQ(numSplits, sessions.size()); + const auto& sessInfos = + _splitSessManager->splitSession(topLevelSessId, txnNumber, requesterIds); + ASSERT_EQ(numSplits, sessInfos.size()); // Attempting to split an already split top-level session should fail. - _splitSessManager->splitSession(topLevelSessId, txnNumber, numSplits + 1); + _splitSessManager->splitSession(topLevelSessId, txnNumber, requesterIds); } DEATH_TEST_F(SplitPrepareSessionManagerTest, ReleaseNonSplitSessions, "invariant") { @@ -115,10 +130,12 @@ DEATH_TEST_F(SplitPrepareSessionManagerTest, ReleaseNonSplitSessions, "invariant DEATH_TEST_F(SplitPrepareSessionManagerTest, ChangeTxnNumberAfterSessionSplit, "invariant") { const auto& topLevelSessId = makeSystemLogicalSessionId(); const TxnNumber txnNumber(100); - const int numSplits = 3; + const std::vector<uint32_t> requesterIds{2, 4, 6}; + const size_t numSplits = requesterIds.size(); - const auto& sessions = _splitSessManager->splitSession(topLevelSessId, txnNumber, numSplits); - ASSERT_EQ(numSplits, sessions.size()); + const auto& sessInfos = + _splitSessManager->splitSession(topLevelSessId, txnNumber, requesterIds); + ASSERT_EQ(numSplits, sessInfos.size()); // Attempting to release a top-level session with different txnNumber should fail. _splitSessManager->releaseSplitSessions(topLevelSessId, txnNumber + 1); diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp index b408894fcff..ca711476df9 100644 --- a/src/mongo/db/transaction/transaction_participant.cpp +++ b/src/mongo/db/transaction/transaction_participant.cpp @@ -2020,7 +2020,7 @@ void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary( const Timestamp& commitTimestamp, const Timestamp& durableTimestamp) { - for (const repl::PooledSession& session : + for (const auto& sessInfos : splitPrepareManager->getSplitSessions(userSessionId, userTxnNumber).get()) { auto splitClientOwned = parentOpCtx->getServiceContext()->makeClient("tempSplitClient"); @@ -2030,6 +2030,7 @@ void TransactionParticipant::Participant::_commitSplitPreparedTxnOnPrimary( std::unique_ptr<MongoDSessionCatalog::Session> checkedOutSession; repl::UnreplicatedWritesBlock notReplicated(splitOpCtx.get()); + const auto& session = sessInfos.session; splitOpCtx->setLogicalSessionId(session.getSessionId()); splitOpCtx->setTxnNumber(session.getTxnNumber()); splitOpCtx->setInMultiDocumentTransaction(); diff --git a/src/mongo/db/transaction/transaction_participant_test.cpp b/src/mongo/db/transaction/transaction_participant_test.cpp index f5b53a25d60..60ccf9c4b76 100644 --- a/src/mongo/db/transaction/transaction_participant_test.cpp +++ b/src/mongo/db/transaction/transaction_participant_test.cpp @@ -5980,19 +5980,18 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) { // TxnResources start in the "stashed" state. userTxnParticipant.unstashTransactionResources(opCtx, "crud ops"); - // Hold the collection lock/datastructure such that it can be released prior to rollback. + // Hold the collection lock/data structure such that it can be released prior to rollback. boost::optional<AutoGetCollection> userColl; userColl.emplace(opCtx, kNss, LockMode::MODE_IX); // We split our user session into 2 split sessions. - const int numSplits = 2; + const std::vector<uint32_t> requesterIds{1, 3}; auto* splitPrepareManager = repl::ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager(); - const std::vector<InternalSessionPool::Session>& splitSessions = - splitPrepareManager->splitSession( - opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get(), numSplits); + const auto& splitSessInfos = splitPrepareManager->splitSession( + opCtx->getLogicalSessionId().get(), opCtx->getTxnNumber().get(), requesterIds); // Insert an `_id: 1` document. - callUnderSplitSession(splitSessions[0], [nullOpDbg](OperationContext* opCtx) { + callUnderSplitSession(splitSessInfos[0].session, [nullOpDbg](OperationContext* opCtx) { AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX); ASSERT_OK( collection_internal::insertDocument(opCtx, @@ -6002,7 +6001,7 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) { }); // Insert an `_id: 2` document. - callUnderSplitSession(splitSessions[1], [nullOpDbg](OperationContext* opCtx) { + callUnderSplitSession(splitSessInfos[1].session, [nullOpDbg](OperationContext* opCtx) { AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX); ASSERT_OK( collection_internal::insertDocument(opCtx, @@ -6013,7 +6012,7 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) { // Update `2` to increment its `value` to 2. This must be done in the same split session as the // insert. - callUnderSplitSession(splitSessions[1], [nullOpDbg](OperationContext* opCtx) { + callUnderSplitSession(splitSessInfos[1].session, [nullOpDbg](OperationContext* opCtx) { AutoGetCollection userColl(opCtx, kNss, LockMode::MODE_IX); Helpers::update( opCtx, userColl->ns(), BSON("_id" << 2), BSON("$inc" << BSON("value" << 1))); @@ -6024,11 +6023,11 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) { // `UnreplicatedWritesBlock` and explicitly pass in the prepare OpTime. const Timestamp prepTs = startTs; const repl::OpTime prepOpTime(prepTs, 1); - callUnderSplitSession(splitSessions[0], [prepOpTime](OperationContext* opCtx) { + callUnderSplitSession(splitSessInfos[0].session, [prepOpTime](OperationContext* opCtx) { auto txnParticipant = TransactionParticipant::get(opCtx); txnParticipant.prepareTransaction(opCtx, prepOpTime); }); - callUnderSplitSession(splitSessions[1], [prepOpTime](OperationContext* opCtx) { + callUnderSplitSession(splitSessInfos[1].session, [prepOpTime](OperationContext* opCtx) { auto txnParticipant = TransactionParticipant::get(opCtx); txnParticipant.prepareTransaction(opCtx, prepOpTime); }); @@ -6114,11 +6113,11 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) { // Rather than testing the implementation, we'll assert on the weakest necessary state. A // split `config.transactions` document may or may not exist. If it exists, it must be // in the "committed" state. - for (auto idx = 0; idx < numSplits; ++idx) { + for (const auto& sessInfo : splitSessInfos) { BSONObj splitTxnObj = Helpers::findOneForTesting( opCtx, configTransactions.getCollection(), - BSON("_id.id" << splitSessions[idx].getSessionId().getId()), + BSON("_id.id" << sessInfo.session.getSessionId().getId()), !invariantOnError); if (!splitTxnObj.isEmpty()) { assertSessionState(splitTxnObj, DurableTxnStateEnum::kCommitted); @@ -6180,11 +6179,11 @@ TEST_F(TxnParticipantTest, SplitTransactionOnPrepare) { // Rather than testing the implementation, we'll assert on the weakest necessary state. A split // `config.transactions` document may or may not exist. If it exists, it must not* be in the // "prepared" state. - for (auto idx = 0; idx < numSplits; ++idx) { + for (const auto& sessInfo : splitSessInfos) { BSONObj splitTxnObj = Helpers::findOneForTesting(opCtx, configTransactions.getCollection(), - BSON("_id.id" << splitSessions[idx].getSessionId().getId()), + BSON("_id.id" << sessInfo.session.getSessionId().getId()), !invariantOnError); if (!splitTxnObj.isEmpty()) { assertNotInSessionState(splitTxnObj, DurableTxnStateEnum::kPrepared); |