diff options
Diffstat (limited to 'src/mongo/db/repl/oplog_applier_impl.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog_applier_impl.cpp | 28 |
1 files changed, 15 insertions, 13 deletions
diff --git a/src/mongo/db/repl/oplog_applier_impl.cpp b/src/mongo/db/repl/oplog_applier_impl.cpp index 35fa85e2ced..885d9f923cd 100644 --- a/src/mongo/db/repl/oplog_applier_impl.cpp +++ b/src/mongo/db/repl/oplog_applier_impl.cpp @@ -116,8 +116,8 @@ void _addOplogChainOpsToWriterVectors(OperationContext* opCtx, std::vector<OplogEntry*>* partialTxnList, std::vector<std::vector<OplogEntry>>* derivedOps, OplogEntry* op, - CachedCollectionProperties* collPropertiesCache, - std::vector<std::vector<ApplierOperation>>* writerVectors) { + std::vector<std::vector<ApplierOperation>>* writerVectors, + CachedCollectionProperties* collPropertiesCache) { auto [txnOps, shouldSerialize] = readTransactionOperationsFromOplogChainAndCheckForCommands(opCtx, *op, *partialTxnList); derivedOps->emplace_back(std::move(txnOps)); @@ -695,9 +695,8 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( std::vector<std::vector<OplogEntry>>* derivedOps, SessionUpdateTracker* sessionUpdateTracker) noexcept { - // Caches partial transaction operations. Each map entry - // contains a cumulative list of operations seen in this batch so - // far. + // Caches partial transaction operations. Each map entry contains a cumulative list + // of operations seen in this batch so far. stdx::unordered_map<OpTime, std::vector<OplogEntry*>, OpTime::Hasher> partialTxnOps; // Provided to _addOplogChainOpsToWriterVectors() when 'partialTxnOps' does not have any entries @@ -791,7 +790,7 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( // Flush partialTxnList operations for current transaction. auto* partialTxnList = getPartialTxnList(op); _addOplogChainOpsToWriterVectors( - opCtx, partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors); + opCtx, partialTxnList, derivedOps, &op, writerVectors, &collPropertiesCache); invariant(partialTxnList->empty(), op.toStringForLogging()); } else { // The applyOps entry was not generated as part of a transaction. @@ -811,18 +810,21 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( if (repl::feature_flags::gApplyPreparedTxnsInParallel.isEnabled( serverGlobalParams.featureCompatibility)) { - // Prepare entries in secondary mode do not come in their own batch, we will extract - // applyOps operations and fill writers with the extracted operations. + // Prepare entries in secondary mode do not come in their own batch, extract applyOps + // operations and fill writers with the extracted operations. if (op.shouldPrepare() && (getOptions().mode == OplogApplication::Mode::kSecondary)) { auto* partialTxnList = getPartialTxnList(op); _addOplogChainOpsToWriterVectors( - opCtx, partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors); - invariant(partialTxnList->empty(), op.toStringForLogging()); + opCtx, partialTxnList, derivedOps, &op, writerVectors, &collPropertiesCache); continue; } - if (op.isPreparedCommit() && + + // Fill the writers with commit or abort operation. Depending on whether the operation + // refers to a split prepare, it might also be split into multiple ops. + if ((op.isPreparedCommit() || op.isPreparedAbort()) && (getOptions().mode == OplogApplication::Mode::kSecondary)) { - // TODO (SERVER-72762): split commit oplog entries. + OplogApplierUtils::addDerivedCommitsOrAborts( + opCtx, &op, writerVectors, &collPropertiesCache); continue; } } @@ -833,7 +835,7 @@ void OplogApplierImpl::_deriveOpsAndFillWriterVectors( if (op.isPreparedCommit() && (getOptions().mode == OplogApplication::Mode::kInitialSync)) { auto* partialTxnList = getPartialTxnList(op); _addOplogChainOpsToWriterVectors( - opCtx, partialTxnList, derivedOps, &op, &collPropertiesCache, writerVectors); + opCtx, partialTxnList, derivedOps, &op, writerVectors, &collPropertiesCache); invariant(partialTxnList->empty(), op.toStringForLogging()); continue; } |