diff options
author | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-02-22 17:11:06 -0500 |
---|---|---|
committer | Siyuan Zhou <siyuan.zhou@mongodb.com> | 2018-02-26 15:30:58 -0500 |
commit | d6c49820b590d23b7d09ef4d8aba2218a7181dd2 (patch) | |
tree | 001d5d42a81de65af9dcf53da750fa28face8a0a | |
parent | 5e3564a89ed631e2b1372eb455a8344f21089264 (diff) | |
download | mongo-d6c49820b590d23b7d09ef4d8aba2218a7181dd2.tar.gz |
SERVER-33215 Remove references to opCtx in onCommit callbacks.
WriteUnitOfWork in multi-statement transactions will call onCommit on
commitTransaction command where references to opCtx of previous commands
are no longer valid.
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 101 |
2 files changed, 49 insertions, 56 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index a098a5b7b38..aec7175d0cc 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -393,11 +393,11 @@ void _logOpsInner(OperationContext* opCtx, checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, timestamps, nDocs)); // Set replCoord last optime only after we're sure the WUOW didn't abort and roll back. - opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] { + opCtx->recoveryUnit()->onCommit([replCoord, finalOpTime] { // Optimes on the primary should always represent consistent database states. replCoord->setMyLastAppliedOpTimeForward( finalOpTime, ReplicationCoordinator::DataConsistency::Consistent); - ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime); + ReplClientInfo::forClient(Client::getCurrent()).setLastOp(finalOpTime); }); } diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 4cf3ae5b468..8b358ed1901 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -54,8 +54,7 @@ namespace mongo { namespace { -void fassertOnRepeatedExecution(OperationContext* opCtx, - const LogicalSessionId& lsid, +void fassertOnRepeatedExecution(const LogicalSessionId& lsid, TxnNumber txnNumber, StmtId stmtId, const repl::OpTime& firstOpTime, @@ -113,8 +112,7 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); if (!insertRes.second) { const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(opCtx, - lsid, + fassertOnRepeatedExecution(lsid, result.lastTxnRecord->getTxnNum(), *entry.getStatementId(), existingOpTime, @@ -289,7 +287,7 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId); if (stmtOpTime) { fassertOnRepeatedExecution( - opCtx, _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); + _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); } } @@ -622,66 +620,61 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) { - opCtx->recoveryUnit()->onCommit([ - this, - opCtx, - newTxnNumber, - stmtIdsWritten = std::move(stmtIdsWritten), - lastStmtIdWriteOpTime - ] { - RetryableWritesStats::get(opCtx)->incrementTransactionsCollectionWriteCount(); + opCtx->recoveryUnit()->onCommit( + [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ] { + RetryableWritesStats::get(getGlobalServiceContext()) + ->incrementTransactionsCollectionWriteCount(); - stdx::lock_guard<stdx::mutex> lg(_mutex); + stdx::lock_guard<stdx::mutex> lg(_mutex); - if (!_isValid) - return; + if (!_isValid) + return; - // The cache of the last written record must always be advanced after a write so that - // subsequent writes have the correct point to start from. - if (!_lastWrittenSessionRecord) { - _lastWrittenSessionRecord.emplace(); + // The cache of the last written record must always be advanced after a write so that + // subsequent writes have the correct point to start from. + if (!_lastWrittenSessionRecord) { + _lastWrittenSessionRecord.emplace(); - _lastWrittenSessionRecord->setSessionId(_sessionId); - _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } else { - if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) + _lastWrittenSessionRecord->setSessionId(_sessionId); _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - - if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } + } else { + if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) + _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - if (newTxnNumber > _activeTxnNumber) { - // This call is necessary in order to advance the txn number and reset the cached state - // in the case where just before the storage transaction commits, the cache entry gets - // invalidated and immediately refreshed while there were no writes for newTxnNumber - // yet. In this case _activeTxnNumber will be less than newTxnNumber and we will fail to - // update the cache even though the write was successful. - _beginOrContinueTxn(lg, newTxnNumber, boost::none); - } + if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) + _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); + } - if (newTxnNumber == _activeTxnNumber) { - for (const auto stmtId : stmtIdsWritten) { - if (stmtId == kIncompleteHistoryStmtId) { - _hasIncompleteHistory = true; - continue; - } + if (newTxnNumber > _activeTxnNumber) { + // This call is necessary in order to advance the txn number and reset the cached + // state in the case where just before the storage transaction commits, the cache + // entry gets invalidated and immediately refreshed while there were no writes for + // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber + // and we will fail to update the cache even though the write was successful. + _beginOrContinueTxn(lg, newTxnNumber, boost::none); + } - const auto insertRes = - _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); - if (!insertRes.second) { - const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(opCtx, - _sessionId, - newTxnNumber, - stmtId, - existingOpTime, - lastStmtIdWriteOpTime); + if (newTxnNumber == _activeTxnNumber) { + for (const auto stmtId : stmtIdsWritten) { + if (stmtId == kIncompleteHistoryStmtId) { + _hasIncompleteHistory = true; + continue; + } + + const auto insertRes = + _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); + if (!insertRes.second) { + const auto& existingOpTime = insertRes.first->second; + fassertOnRepeatedExecution(_sessionId, + newTxnNumber, + stmtId, + existingOpTime, + lastStmtIdWriteOpTime); + } } } - } - }); + }); MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) { const auto& data = customArgs.getData(); |