summaryrefslogtreecommitdiff
path: root/src/mongo/db/repl/sync_tail.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r--src/mongo/db/repl/sync_tail.cpp62
1 files changed, 50 insertions, 12 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 6b33953ca12..2c3e6317dc0 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -796,7 +796,7 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord,
// Apply the operations in this batch. 'multiApply' returns the optime of the last op that
// was applied, which should be the last optime in the batch.
auto lastOpTimeAppliedInBatch =
- fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch()));
+ fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch(), boost::none));
invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch);
// In order to provide resilience in the event of a crash in the middle of batch
@@ -847,6 +847,12 @@ inline bool isCommitApplyOps(const OplogEntry& entry) {
!entry.isPartialTransaction() && !entry.getObject().getBoolField("prepare");
}
+// Returns whether a commitTransaction oplog entry is a part of a prepared transaction.
+inline bool isPreparedCommit(const OplogEntry& entry) {
+ return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction;
+}
+
+
void SyncTail::shutdown() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
_inShutdown = true;
@@ -1109,7 +1115,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
std::vector<MultiApplier::Operations>* derivedOps,
- SessionUpdateTracker* sessionUpdateTracker) {
+ SessionUpdateTracker* sessionUpdateTracker,
+ boost::optional<repl::OplogApplication::Mode> mode) {
const auto serviceContext = opCtx->getServiceContext();
const auto storageEngine = serviceContext->getStorageEngine();
@@ -1137,7 +1144,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
if (sessionUpdateTracker) {
if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) {
derivedOps->emplace_back(std::move(*newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ _fillWriterVectors(
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
}
}
@@ -1199,13 +1207,13 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// messing up the state of the opCtx. In particular we do not want to
// set the ReadSource to kLastApplied.
ReadSourceScope readSourceScope(opCtx);
- derivedOps->emplace_back(
- readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList));
+ derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
+ opCtx, op, partialTxnList, boost::none));
partialTxnList.clear();
}
// Transaction entries cannot have different session updates.
_fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
} else {
// The applyOps entry was not generated as part of a transaction.
invariant(!op.getPrevWriteOpTimeInTransaction());
@@ -1213,7 +1221,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
// Nested entries cannot have different session updates.
_fillWriterVectors(
- opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
}
} catch (...) {
fassertFailedWithStatusNoTrace(
@@ -1225,6 +1233,32 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
continue;
}
+ // If we see a commitTransaction command that is a part of a prepared transaction during
+ // initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers
+ // with the extracted operations.
+ if (isPreparedCommit(op) && (mode == OplogApplication::Mode::kInitialSync)) {
+ auto logicalSessionId = op.getSessionId();
+ auto& partialTxnList = partialTxnOps[*logicalSessionId];
+
+ {
+ // Traverse the oplog chain with its own snapshot and read timestamp.
+ ReadSourceScope readSourceScope(opCtx);
+
+ // Get the previous oplog entry, which should be a prepare oplog entry.
+ const auto prevOplogEntry = getPreviousOplogEntry(opCtx, op);
+ invariant(prevOplogEntry.shouldPrepare());
+
+ // Extract the operations from the applyOps entry.
+ auto commitOplogEntryOpTime = op.getOpTime();
+ derivedOps->emplace_back(readTransactionOperationsFromOplogChain(
+ opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp()));
+ }
+
+ _fillWriterVectors(
+ opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
+ continue;
+ }
+
auto& writer = (*writerVectors)[hash % numWriters];
if (writer.empty()) {
writer.reserve(8); // Skip a few growth rounds
@@ -1236,14 +1270,15 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx,
void SyncTail::fillWriterVectors(OperationContext* opCtx,
MultiApplier::Operations* ops,
std::vector<MultiApplier::OperationPtrs>* writerVectors,
- std::vector<MultiApplier::Operations>* derivedOps) {
+ std::vector<MultiApplier::Operations>* derivedOps,
+ boost::optional<repl::OplogApplication::Mode> mode) {
SessionUpdateTracker sessionUpdateTracker;
- _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker);
+ _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker, mode);
auto newOplogWrites = sessionUpdateTracker.flushAll();
if (!newOplogWrites.empty()) {
derivedOps->emplace_back(std::move(newOplogWrites));
- _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr);
+ _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode);
}
}
@@ -1275,10 +1310,13 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors
}
}
-StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) {
+StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx,
+ MultiApplier::Operations ops,
+ boost::optional<repl::OplogApplication::Mode> mode) {
invariant(!ops.empty());
LOG(2) << "replication batch size is " << ops.size();
+
// Stop all readers until we're done. This also prevents doc-locking engines from deleting old
// entries from the oplog until we finish writing.
Lock::ParallelBatchWriterMode pbwm(opCtx->lockState());
@@ -1318,7 +1356,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O
std::vector<MultiApplier::Operations> derivedOps;
std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads);
- fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps);
+ fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps, mode);
// Wait for writes to finish before applying ops.
_writerPool->waitForIdle();