diff options
Diffstat (limited to 'src/mongo/db/repl/sync_tail.cpp')
-rw-r--r-- | src/mongo/db/repl/sync_tail.cpp | 62 |
1 files changed, 50 insertions, 12 deletions
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 6b33953ca12..2c3e6317dc0 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -796,7 +796,7 @@ void SyncTail::_oplogApplication(ReplicationCoordinator* replCoord, // Apply the operations in this batch. 'multiApply' returns the optime of the last op that // was applied, which should be the last optime in the batch. auto lastOpTimeAppliedInBatch = - fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch())); + fassertNoTrace(34437, multiApply(&opCtx, ops.releaseBatch(), boost::none)); invariant(lastOpTimeAppliedInBatch == lastOpTimeInBatch); // In order to provide resilience in the event of a crash in the middle of batch @@ -847,6 +847,12 @@ inline bool isCommitApplyOps(const OplogEntry& entry) { !entry.isPartialTransaction() && !entry.getObject().getBoolField("prepare"); } +// Returns whether a commitTransaction oplog entry is a part of a prepared transaction. +inline bool isPreparedCommit(const OplogEntry& entry) { + return entry.getCommandType() == OplogEntry::CommandType::kCommitTransaction; +} + + void SyncTail::shutdown() { stdx::lock_guard<stdx::mutex> lock(_mutex); _inShutdown = true; @@ -1109,7 +1115,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, std::vector<MultiApplier::Operations>* derivedOps, - SessionUpdateTracker* sessionUpdateTracker) { + SessionUpdateTracker* sessionUpdateTracker, + boost::optional<repl::OplogApplication::Mode> mode) { const auto serviceContext = opCtx->getServiceContext(); const auto storageEngine = serviceContext->getStorageEngine(); @@ -1137,7 +1144,8 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, if (sessionUpdateTracker) { if (auto newOplogWrites = sessionUpdateTracker->updateSession(op)) { derivedOps->emplace_back(std::move(*newOplogWrites)); - _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + _fillWriterVectors( + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); } } @@ -1199,13 +1207,13 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // messing up the state of the opCtx. In particular we do not want to // set the ReadSource to kLastApplied. ReadSourceScope readSourceScope(opCtx); - derivedOps->emplace_back( - readTransactionOperationsFromOplogChain(opCtx, op, partialTxnList)); + derivedOps->emplace_back(readTransactionOperationsFromOplogChain( + opCtx, op, partialTxnList, boost::none)); partialTxnList.clear(); } // Transaction entries cannot have different session updates. _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); } else { // The applyOps entry was not generated as part of a transaction. invariant(!op.getPrevWriteOpTimeInTransaction()); @@ -1213,7 +1221,7 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, // Nested entries cannot have different session updates. _fillWriterVectors( - opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); } } catch (...) { fassertFailedWithStatusNoTrace( @@ -1225,6 +1233,32 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, continue; } + // If we see a commitTransaction command that is a part of a prepared transaction during + // initial sync, find the prepare oplog entry, extract applyOps operations, and fill writers + // with the extracted operations. + if (isPreparedCommit(op) && (mode == OplogApplication::Mode::kInitialSync)) { + auto logicalSessionId = op.getSessionId(); + auto& partialTxnList = partialTxnOps[*logicalSessionId]; + + { + // Traverse the oplog chain with its own snapshot and read timestamp. + ReadSourceScope readSourceScope(opCtx); + + // Get the previous oplog entry, which should be a prepare oplog entry. + const auto prevOplogEntry = getPreviousOplogEntry(opCtx, op); + invariant(prevOplogEntry.shouldPrepare()); + + // Extract the operations from the applyOps entry. + auto commitOplogEntryOpTime = op.getOpTime(); + derivedOps->emplace_back(readTransactionOperationsFromOplogChain( + opCtx, prevOplogEntry, partialTxnList, commitOplogEntryOpTime.getTimestamp())); + } + + _fillWriterVectors( + opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); + continue; + } + auto& writer = (*writerVectors)[hash % numWriters]; if (writer.empty()) { writer.reserve(8); // Skip a few growth rounds @@ -1236,14 +1270,15 @@ void SyncTail::_fillWriterVectors(OperationContext* opCtx, void SyncTail::fillWriterVectors(OperationContext* opCtx, MultiApplier::Operations* ops, std::vector<MultiApplier::OperationPtrs>* writerVectors, - std::vector<MultiApplier::Operations>* derivedOps) { + std::vector<MultiApplier::Operations>* derivedOps, + boost::optional<repl::OplogApplication::Mode> mode) { SessionUpdateTracker sessionUpdateTracker; - _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker); + _fillWriterVectors(opCtx, ops, writerVectors, derivedOps, &sessionUpdateTracker, mode); auto newOplogWrites = sessionUpdateTracker.flushAll(); if (!newOplogWrites.empty()) { derivedOps->emplace_back(std::move(newOplogWrites)); - _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr); + _fillWriterVectors(opCtx, &derivedOps->back(), writerVectors, derivedOps, nullptr, mode); } } @@ -1275,10 +1310,13 @@ void SyncTail::_applyOps(std::vector<MultiApplier::OperationPtrs>& writerVectors } } -StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::Operations ops) { +StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, + MultiApplier::Operations ops, + boost::optional<repl::OplogApplication::Mode> mode) { invariant(!ops.empty()); LOG(2) << "replication batch size is " << ops.size(); + // Stop all readers until we're done. This also prevents doc-locking engines from deleting old // entries from the oplog until we finish writing. Lock::ParallelBatchWriterMode pbwm(opCtx->lockState()); @@ -1318,7 +1356,7 @@ StatusWith<OpTime> SyncTail::multiApply(OperationContext* opCtx, MultiApplier::O std::vector<MultiApplier::Operations> derivedOps; std::vector<MultiApplier::OperationPtrs> writerVectors(_writerPool->getStats().numThreads); - fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps); + fillWriterVectors(opCtx, &ops, &writerVectors, &derivedOps, mode); // Wait for writes to finish before applying ops. _writerPool->waitForIdle(); |