summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/oplog_applier_utils.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/oplog_applier_utils.cpp')
-rw-r--r--src/mongo/db/repl/oplog_applier_utils.cpp83
1 files changed, 60 insertions, 23 deletions
diff --git a/src/mongo/db/repl/oplog_applier_utils.cpp b/src/mongo/db/repl/oplog_applier_utils.cpp
index 2f8bed62443..9a7fddb9503 100644
--- a/src/mongo/db/repl/oplog_applier_utils.cpp
+++ b/src/mongo/db/repl/oplog_applier_utils.cpp
@@ -207,47 +207,84 @@ void OplogApplierUtils::addDerivedPrepares(
std::vector<std::vector<ApplierOperation>>* writerVectors,
CachedCollectionProperties* collPropertiesCache) {
- uint32_t bufSplits = 0;
- std::vector<std::vector<const OplogEntry*>> bufWriterVectors(writerVectors->size());
-
- // Add the ops in the prepared transaction to the buffered writer vectors.
- for (auto&& op : *derivedOps) {
- auto writerId = addToWriterVector(opCtx, &op, &bufWriterVectors, collPropertiesCache);
- bufSplits += bufWriterVectors[writerId].size() == 1;
- }
-
- // Create the split sessions and track them with the the session of this prepare entry.
- uint32_t realSplits = std::max<uint32_t>(bufSplits, 1);
+ // Get the SplitPrepareSessionManager to be used to create split sessions.
auto splitSessManager = ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager();
- const auto& splitSessions = splitSessManager->splitSession(
- *prepareOp->getSessionId(), *prepareOp->getTxnNumber(), realSplits);
- invariant(splitSessions.size() == realSplits);
+ auto splitSessFunc = [=](const std::vector<uint32_t>& writerIds) -> const auto& {
+ const auto& sessions = splitSessManager->splitSession(
+ *prepareOp->getSessionId(), *prepareOp->getTxnNumber(), writerIds);
+ invariant(sessions.size() == writerIds.size());
+ return sessions;
+ };
- // For empty (read-only) prepares, the namespace of the prepare oplog entry (admin.$cmd)
- // will be used to decide which writer vector to add to.
- if (!bufSplits) {
+ // For empty (read-only) prepares, we use the namespace of the original prepare oplog entry
+ // (admin.$cmd) to decide which writer thread to apply it, and assigned it a split session.
+ //
+ // The reason that we also split an empty prepare instead of treating it as some standalone
+ // prepare op (as the prepares in initial sync or recovery mode) is so that we can keep a
+ // logical invariant that all prepares in secondary mode are split, and thus we can apply
+ // empty and non-empty prepares in the same way.
+ if (derivedOps->empty()) {
auto writerId = getWriterId(opCtx, prepareOp, collPropertiesCache, writerVectors->size());
+ const auto& sessionInfos = splitSessFunc({writerId});
(*writerVectors)[writerId].emplace_back(prepareOp,
- ApplicationInstruction::applySplitPrepareOps,
- splitSessions[0],
+ ApplicationInstruction::applySplitPrepareOp,
+ sessionInfos[0].session,
std::vector<const OplogEntry*>{});
return;
}
- // For each writer thread that has been assigned ops for this transaction, acquire a
- // split session and transfer the ops to the real writer vector.
+ // For non-empty prepares, the namespace of each derived op in the transaction is used to
+ // decide which writer thread to apply it. We first add all the derived ops to a buffer
+ // writer vector in order to get all the writer threads needed to apply this transaction.
+ // We then acquire that number of split sessions and assign each writer thread a unique
+ // split session when moving the ops to the real writer vector.
+ std::set<uint32_t> writerIds;
+ std::vector<std::vector<const OplogEntry*>> bufWriterVectors(writerVectors->size());
+ for (auto&& op : *derivedOps) {
+ auto writerId = addToWriterVector(opCtx, &op, &bufWriterVectors, collPropertiesCache);
+ writerIds.emplace(writerId);
+ }
+
+ const auto& sessionInfos = splitSessFunc({writerIds.begin(), writerIds.end()});
for (size_t i = 0, j = 0; i < bufWriterVectors.size(); ++i) {
auto& bufWriter = bufWriterVectors[i];
auto& realWriter = (*writerVectors)[i];
if (!bufWriter.empty()) {
realWriter.emplace_back(prepareOp,
- ApplicationInstruction::applySplitPrepareOps,
- splitSessions[j++],
+ ApplicationInstruction::applySplitPrepareOp,
+ sessionInfos[j++].session,
std::move(bufWriter));
}
}
}
+void OplogApplierUtils::addDerivedCommitsOrAborts(
+ OperationContext* opCtx,
+ OplogEntry* commitOrAbortOp,
+ std::vector<std::vector<ApplierOperation>>* writerVectors,
+ CachedCollectionProperties* collPropertiesCache) {
+
+ auto splitSessManager = ReplicationCoordinator::get(opCtx)->getSplitPrepareSessionManager();
+ const auto& sessionInfos = splitSessManager->getSplitSessions(*commitOrAbortOp->getSessionId(),
+ *commitOrAbortOp->getTxnNumber());
+
+ // When this commit refers to a non-split prepare, it means the transaction was
+ // prepared when the node was primary or during inital sync/recovery. In this
+ // case we do not split the commit and just add it as-is to the writer vector.
+ if (!sessionInfos.has_value()) {
+ addToWriterVector(opCtx, commitOrAbortOp, writerVectors, collPropertiesCache);
+ return;
+ }
+
+ // When this commit refers to a split prepare, we split the commit and add them
+ // to the writers that have been assigned split prepare ops.
+ for (const auto& sessInfo : *sessionInfos) {
+ auto& writer = (*writerVectors)[sessInfo.requesterId];
+ writer.emplace_back(
+ commitOrAbortOp, ApplicationInstruction::applySplitCommitOrAbortOp, sessInfo.session);
+ }
+}
+
NamespaceString OplogApplierUtils::parseUUIDOrNs(OperationContext* opCtx,
const OplogEntry& oplogEntry) {
auto optionalUuid = oplogEntry.getUuid();