diff options
Diffstat (limited to 'src/mongo/db/repl/oplog_applier_utils.cpp')
-rw-r--r-- | src/mongo/db/repl/oplog_applier_utils.cpp | 83 |
1 files changed, 60 insertions, 23 deletions
diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp index 2f8bed62443..9a7fddb9503 100644 --- a/src/mongo/db/repl/oplog_applier_utils.cpp +++ b/src/mongo/db/repl/oplog_applier_utils.cpp @@ -207,47 +207,84 @@ void OplogApplierUtils::addDerivedPrepares( std::vector<std::vector<ApplierOperation>>* writerVectors, CachedCollectionProperties* collPropertiesCache) { - uint32_t bufSplits = 0; - std::vector<std::vector<const OplogEntry*>> bufWriterVectors(writerVectors->size()); - - // Add the ops in the prepared transaction to the buffered writer vectors. - for (auto&& op : *derivedOps) { - auto writerId = addToWriterVector(opCtx, &op, &bufWriterVectors, collPropertiesCache); - bufSplits += bufWriterVectors[writerId].size() == 1; - } - - // Create the split sessions and track them with the the session of this prepare entry. - uint32_t realSplits = std::max<uint32_t>(bufSplits, 1); + // Get the SplitPrepareSessionManager to be used to create split sessions. auto splitSessManager = ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager(); - const auto& splitSessions = splitSessManager->splitSession( - *prepareOp->getSessionId(), *prepareOp->getTxnNumber(), realSplits); - invariant(splitSessions.size() == realSplits); + auto splitSessFunc = [=](const std::vector<uint32_t>& writerIds) -> const auto& { + const auto& sessions = splitSessManager->splitSession( + *prepareOp->getSessionId(), *prepareOp->getTxnNumber(), writerIds); + invariant(sessions.size() == writerIds.size()); + return sessions; + }; - // For empty (read-only) prepares, the namespace of the prepare oplog entry (admin.$cmd) - // will be used to decide which writer vector to add to. - if (!bufSplits) { + // For empty (read-only) prepares, we use the namespace of the original prepare oplog entry + // (admin.$cmd) to decide which writer thread to apply it, and assigned it a split session. + // + // The reason that we also split an empty prepare instead of treating it as some standalone + // prepare op (as the prepares in initial sync or recovery mode) is so that we can keep a + // logical invariant that all prepares in secondary mode are split, and thus we can apply + // empty and non-empty prepares in the same way. + if (derivedOps->empty()) { auto writerId = getWriterId(opCtx, prepareOp, collPropertiesCache, writerVectors->size()); + const auto& sessionInfos = splitSessFunc({writerId}); (*writerVectors)[writerId].emplace_back(prepareOp, - ApplicationInstruction::applySplitPrepareOps, - splitSessions[0], + ApplicationInstruction::applySplitPrepareOp, + sessionInfos[0].session, std::vector<const OplogEntry*>{}); return; } - // For each writer thread that has been assigned ops for this transaction, acquire a - // split session and transfer the ops to the real writer vector. + // For non-empty prepares, the namespace of each derived op in the transaction is used to + // decide which writer thread to apply it. We first add all the derived ops to a buffer + // writer vector in order to get all the writer threads needed to apply this transaction. + // We then acquire that number of split sessions and assign each writer thread a unique + // split session when moving the ops to the real writer vector. + std::set<uint32_t> writerIds; + std::vector<std::vector<const OplogEntry*>> bufWriterVectors(writerVectors->size()); + for (auto&& op : *derivedOps) { + auto writerId = addToWriterVector(opCtx, &op, &bufWriterVectors, collPropertiesCache); + writerIds.emplace(writerId); + } + + const auto& sessionInfos = splitSessFunc({writerIds.begin(), writerIds.end()}); for (size_t i = 0, j = 0; i < bufWriterVectors.size(); ++i) { auto& bufWriter = bufWriterVectors[i]; auto& realWriter = (*writerVectors)[i]; if (!bufWriter.empty()) { realWriter.emplace_back(prepareOp, - ApplicationInstruction::applySplitPrepareOps, - splitSessions[j++], + ApplicationInstruction::applySplitPrepareOp, + sessionInfos[j++].session, std::move(bufWriter)); } } } +void OplogApplierUtils::addDerivedCommitsOrAborts( + OperationContext* opCtx, + OplogEntry* commitOrAbortOp, + std::vector<std::vector<ApplierOperation>>* writerVectors, + CachedCollectionProperties* collPropertiesCache) { + + auto splitSessManager = ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager(); + const auto& sessionInfos = splitSessManager->getSplitSessions(*commitOrAbortOp->getSessionId(), + *commitOrAbortOp->getTxnNumber()); + + // When this commit refers to a non-split prepare, it means the transaction was + // prepared when the node was primary or during inital sync/recovery. In this + // case we do not split the commit and just add it as-is to the writer vector. + if (!sessionInfos.has_value()) { + addToWriterVector(opCtx, commitOrAbortOp, writerVectors, collPropertiesCache); + return; + } + + // When this commit refers to a split prepare, we split the commit and add them + // to the writers that have been assigned split prepare ops. + for (const auto& sessInfo : *sessionInfos) { + auto& writer = (*writerVectors)[sessInfo.requesterId]; + writer.emplace_back( + commitOrAbortOp, ApplicationInstruction::applySplitCommitOrAbortOp, sessInfo.session); + } +} + NamespaceString OplogApplierUtils::parseUUIDOrNs(OperationContext* opCtx, const OplogEntry& oplogEntry) { auto optionalUuid = oplogEntry.getUuid(); |