diff options
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 101 |
2 files changed, 56 insertions, 49 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index aec7175d0cc..a098a5b7b38 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -393,11 +393,11 @@ void _logOpsInner(OperationContext* opCtx, checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, timestamps, nDocs)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. - opCtx->recoveryUnit()->onCommit([replCoord, finalOpTime] { + opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] { // Optimes on the primary should always represent consistent database states. replCoord->setMyLastAppliedOpTimeForward( finalOpTime, ReplicationCoordinator::DataConsistency::Consistent); - ReplClientInfo::forClient(Client::getCurrent()).setLastOp(finalOpTime); + ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime); }); } diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 1310b261932..dcc17c26dd9 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -54,7 +54,8 @@ namespace mongo { namespace { -void fassertOnRepeatedExecution(const LogicalSessionId& lsid, +void fassertOnRepeatedExecution(OperationContext* opCtx, + const LogicalSessionId& lsid, TxnNumber txnNumber, StmtId stmtId, const repl::OpTime& firstOpTime, @@ -112,7 +113,8 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); if (!insertRes.second) { const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(lsid, + fassertOnRepeatedExecution(opCtx, + lsid, result.lastTxnRecord->getTxnNum(), *entry.getStatementId(), existingOpTime, @@ -287,7 +289,7 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId); if (stmtOpTime) { fassertOnRepeatedExecution( - _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); + opCtx, _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); } } @@ -613,61 +615,66 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) { - opCtx->recoveryUnit()->onCommit( - [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ] { - RetryableWritesStats::get(getGlobalServiceContext()) - ->incrementTransactionsCollectionWriteCount(); + opCtx->recoveryUnit()->onCommit([ + this, + opCtx, + newTxnNumber, + stmtIdsWritten = std::move(stmtIdsWritten), + lastStmtIdWriteOpTime + ] { + RetryableWritesStats::get(opCtx)->incrementTransactionsCollectionWriteCount(); - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); - if (!_isValid) - return; + if (!_isValid) + return; - // The cache of the last written record must always be advanced after a write so that - // subsequent writes have the correct point to start from. - if (!_lastWrittenSessionRecord) { - _lastWrittenSessionRecord.emplace(); + // The cache of the last written record must always be advanced after a write so that + // subsequent writes have the correct point to start from. + if (!_lastWrittenSessionRecord) { + _lastWrittenSessionRecord.emplace(); - _lastWrittenSessionRecord->setSessionId(_sessionId); + _lastWrittenSessionRecord->setSessionId(_sessionId); + _lastWrittenSessionRecord->setTxnNum(newTxnNumber); + _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); + } else { + if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) _lastWrittenSessionRecord->setTxnNum(newTxnNumber); + + if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } else { - if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) - _lastWrittenSessionRecord->setTxnNum(newTxnNumber); + } - if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) - _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } + if (newTxnNumber > _activeTxnNumber) { + // This call is necessary in order to advance the txn number and reset the cached state + // in the case where just before the storage transaction commits, the cache entry gets + // invalidated and immediately refreshed while there were no writes for newTxnNumber + // yet. In this case _activeTxnNumber will be less than newTxnNumber and we will fail to + // update the cache even though the write was successful. + _beginOrContinueTxn(lg, newTxnNumber, boost::none); + } - if (newTxnNumber > _activeTxnNumber) { - // This call is necessary in order to advance the txn number and reset the cached - // state in the case where just before the storage transaction commits, the cache - // entry gets invalidated and immediately refreshed while there were no writes for - // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber - // and we will fail to update the cache even though the write was successful. - _beginOrContinueTxn(lg, newTxnNumber, boost::none); - } + if (newTxnNumber == _activeTxnNumber) { + for (const auto stmtId : stmtIdsWritten) { + if (stmtId == kIncompleteHistoryStmtId) { + _hasIncompleteHistory = true; + continue; + } - if (newTxnNumber == _activeTxnNumber) { - for (const auto stmtId : stmtIdsWritten) { - if (stmtId == kIncompleteHistoryStmtId) { - _hasIncompleteHistory = true; - continue; - } - - const auto insertRes = - _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); - if (!insertRes.second) { - const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(_sessionId, - newTxnNumber, - stmtId, - existingOpTime, - lastStmtIdWriteOpTime); - } + const auto insertRes = + _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); + if (!insertRes.second) { + const auto& existingOpTime = insertRes.first->second; + fassertOnRepeatedExecution(opCtx, + _sessionId, + newTxnNumber, + stmtId, + existingOpTime, + lastStmtIdWriteOpTime); } } - }); + } + }); MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) { const auto& data = customArgs.getData(); |