summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSiyuan Zhou <siyuan.zhou@mongodb.com>2018-02-22 17:11:06 -0500
committerSiyuan Zhou <siyuan.zhou@mongodb.com>2018-02-26 15:30:58 -0500
commitd6c49820b590d23b7d09ef4d8aba2218a7181dd2 (patch)
tree001d5d42a81de65af9dcf53da750fa28face8a0a
parent5e3564a89ed631e2b1372eb455a8344f21089264 (diff)
downloadmongo-d6c49820b590d23b7d09ef4d8aba2218a7181dd2.tar.gz
SERVER-33215 Remove references to opCtx in onCommit callbacks.
WriteUnitOfWork in multi-statement transactions will call onCommit on commitTransaction command where references to opCtx of previous commands are no longer valid.
-rw-r--r--src/mongo/db/repl/oplog.cpp4
-rw-r--r--src/mongo/db/session.cpp101
2 files changed, 49 insertions, 56 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index a098a5b7b38..aec7175d0cc 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -393,11 +393,11 @@ void _logOpsInner(OperationContext* opCtx,
checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, timestamps, nDocs));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
- opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] {
+ opCtx->recoveryUnit()->onCommit([replCoord, finalOpTime] {
// Optimes on the primary should always represent consistent database states.
replCoord->setMyLastAppliedOpTimeForward(
finalOpTime, ReplicationCoordinator::DataConsistency::Consistent);
- ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime);
+ ReplClientInfo::forClient(Client::getCurrent()).setLastOp(finalOpTime);
});
}
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 4cf3ae5b468..8b358ed1901 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -54,8 +54,7 @@
namespace mongo {
namespace {
-void fassertOnRepeatedExecution(OperationContext* opCtx,
- const LogicalSessionId& lsid,
+void fassertOnRepeatedExecution(const LogicalSessionId& lsid,
TxnNumber txnNumber,
StmtId stmtId,
const repl::OpTime& firstOpTime,
@@ -113,8 +112,7 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
if (!insertRes.second) {
const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(opCtx,
- lsid,
+ fassertOnRepeatedExecution(lsid,
result.lastTxnRecord->getTxnNum(),
*entry.getStatementId(),
existingOpTime,
@@ -289,7 +287,7 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId);
if (stmtOpTime) {
fassertOnRepeatedExecution(
- opCtx, _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
+ _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
}
}
@@ -622,66 +620,61 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
TxnNumber newTxnNumber,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime) {
- opCtx->recoveryUnit()->onCommit([
- this,
- opCtx,
- newTxnNumber,
- stmtIdsWritten = std::move(stmtIdsWritten),
- lastStmtIdWriteOpTime
- ] {
- RetryableWritesStats::get(opCtx)->incrementTransactionsCollectionWriteCount();
+ opCtx->recoveryUnit()->onCommit(
+ [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ] {
+ RetryableWritesStats::get(getGlobalServiceContext())
+ ->incrementTransactionsCollectionWriteCount();
- stdx::lock_guard<stdx::mutex> lg(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
- if (!_isValid)
- return;
+ if (!_isValid)
+ return;
- // The cache of the last written record must always be advanced after a write so that
- // subsequent writes have the correct point to start from.
- if (!_lastWrittenSessionRecord) {
- _lastWrittenSessionRecord.emplace();
+ // The cache of the last written record must always be advanced after a write so that
+ // subsequent writes have the correct point to start from.
+ if (!_lastWrittenSessionRecord) {
+ _lastWrittenSessionRecord.emplace();
- _lastWrittenSessionRecord->setSessionId(_sessionId);
- _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
- _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
- } else {
- if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
+ _lastWrittenSessionRecord->setSessionId(_sessionId);
_lastWrittenSessionRecord->setTxnNum(newTxnNumber);
-
- if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
_lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
- }
+ } else {
+ if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
+ _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
- if (newTxnNumber > _activeTxnNumber) {
- // This call is necessary in order to advance the txn number and reset the cached state
- // in the case where just before the storage transaction commits, the cache entry gets
- // invalidated and immediately refreshed while there were no writes for newTxnNumber
- // yet. In this case _activeTxnNumber will be less than newTxnNumber and we will fail to
- // update the cache even though the write was successful.
- _beginOrContinueTxn(lg, newTxnNumber, boost::none);
- }
+ if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
+ _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
+ }
- if (newTxnNumber == _activeTxnNumber) {
- for (const auto stmtId : stmtIdsWritten) {
- if (stmtId == kIncompleteHistoryStmtId) {
- _hasIncompleteHistory = true;
- continue;
- }
+ if (newTxnNumber > _activeTxnNumber) {
+ // This call is necessary in order to advance the txn number and reset the cached
+ // state in the case where just before the storage transaction commits, the cache
+ // entry gets invalidated and immediately refreshed while there were no writes for
+ // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber
+ // and we will fail to update the cache even though the write was successful.
+ _beginOrContinueTxn(lg, newTxnNumber, boost::none);
+ }
- const auto insertRes =
- _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
- if (!insertRes.second) {
- const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(opCtx,
- _sessionId,
- newTxnNumber,
- stmtId,
- existingOpTime,
- lastStmtIdWriteOpTime);
+ if (newTxnNumber == _activeTxnNumber) {
+ for (const auto stmtId : stmtIdsWritten) {
+ if (stmtId == kIncompleteHistoryStmtId) {
+ _hasIncompleteHistory = true;
+ continue;
+ }
+
+ const auto insertRes =
+ _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
+ if (!insertRes.second) {
+ const auto& existingOpTime = insertRes.first->second;
+ fassertOnRepeatedExecution(_sessionId,
+ newTxnNumber,
+ stmtId,
+ existingOpTime,
+ lastStmtIdWriteOpTime);
+ }
}
}
- }
- });
+ });
MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) {
const auto& data = customArgs.getData();