summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2018-02-27 13:46:01 -0500
committerJack Mulrow <jack.mulrow@mongodb.com>2018-02-27 13:46:01 -0500
commitd4ae81a7154ab57a266b38d4fe41dd12a3c4540a (patch)
treec977e45f9d282c57af49618a718326530feeef70
parent4a69dcaf78f29cf440ada961c7e59de6b5dde111 (diff)
downloadmongo-d4ae81a7154ab57a266b38d4fe41dd12a3c4540a.tar.gz
Revert "SERVER-33215 Remove references to opCtx in onCommit callbacks."
This reverts commit d6c49820b590d23b7d09ef4d8aba2218a7181dd2.
-rw-r--r--src/mongo/db/repl/oplog.cpp4
-rw-r--r--src/mongo/db/session.cpp101
2 files changed, 56 insertions, 49 deletions
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index aec7175d0cc..a098a5b7b38 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -393,11 +393,11 @@ void _logOpsInner(OperationContext* opCtx,
checkOplogInsert(oplogCollection->insertDocumentsForOplog(opCtx, writers, timestamps, nDocs));
// Set replCoord last optime only after we're sure the WUOW didn't abort and roll back.
- opCtx->recoveryUnit()->onCommit([replCoord, finalOpTime] {
+ opCtx->recoveryUnit()->onCommit([opCtx, replCoord, finalOpTime] {
// Optimes on the primary should always represent consistent database states.
replCoord->setMyLastAppliedOpTimeForward(
finalOpTime, ReplicationCoordinator::DataConsistency::Consistent);
- ReplClientInfo::forClient(Client::getCurrent()).setLastOp(finalOpTime);
+ ReplClientInfo::forClient(opCtx->getClient()).setLastOp(finalOpTime);
});
}
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 1310b261932..dcc17c26dd9 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -54,7 +54,8 @@
namespace mongo {
namespace {
-void fassertOnRepeatedExecution(const LogicalSessionId& lsid,
+void fassertOnRepeatedExecution(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
TxnNumber txnNumber,
StmtId stmtId,
const repl::OpTime& firstOpTime,
@@ -112,7 +113,8 @@ ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
if (!insertRes.second) {
const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(lsid,
+ fassertOnRepeatedExecution(opCtx,
+ lsid,
result.lastTxnRecord->getTxnNum(),
*entry.getStatementId(),
existingOpTime,
@@ -287,7 +289,7 @@ void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId);
if (stmtOpTime) {
fassertOnRepeatedExecution(
- _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
+ opCtx, _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
}
}
@@ -613,61 +615,66 @@ void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
TxnNumber newTxnNumber,
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime) {
- opCtx->recoveryUnit()->onCommit(
- [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ] {
- RetryableWritesStats::get(getGlobalServiceContext())
- ->incrementTransactionsCollectionWriteCount();
+ opCtx->recoveryUnit()->onCommit([
+ this,
+ opCtx,
+ newTxnNumber,
+ stmtIdsWritten = std::move(stmtIdsWritten),
+ lastStmtIdWriteOpTime
+ ] {
+ RetryableWritesStats::get(opCtx)->incrementTransactionsCollectionWriteCount();
- stdx::lock_guard<stdx::mutex> lg(_mutex);
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
- if (!_isValid)
- return;
+ if (!_isValid)
+ return;
- // The cache of the last written record must always be advanced after a write so that
- // subsequent writes have the correct point to start from.
- if (!_lastWrittenSessionRecord) {
- _lastWrittenSessionRecord.emplace();
+ // The cache of the last written record must always be advanced after a write so that
+ // subsequent writes have the correct point to start from.
+ if (!_lastWrittenSessionRecord) {
+ _lastWrittenSessionRecord.emplace();
- _lastWrittenSessionRecord->setSessionId(_sessionId);
+ _lastWrittenSessionRecord->setSessionId(_sessionId);
+ _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
+ _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
+ } else {
+ if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
_lastWrittenSessionRecord->setTxnNum(newTxnNumber);
+
+ if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
_lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
- } else {
- if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
- _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
+ }
- if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
- _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
- }
+ if (newTxnNumber > _activeTxnNumber) {
+ // This call is necessary in order to advance the txn number and reset the cached state
+ // in the case where just before the storage transaction commits, the cache entry gets
+ // invalidated and immediately refreshed while there were no writes for newTxnNumber
+ // yet. In this case _activeTxnNumber will be less than newTxnNumber and we will fail to
+ // update the cache even though the write was successful.
+ _beginOrContinueTxn(lg, newTxnNumber, boost::none);
+ }
- if (newTxnNumber > _activeTxnNumber) {
- // This call is necessary in order to advance the txn number and reset the cached
- // state in the case where just before the storage transaction commits, the cache
- // entry gets invalidated and immediately refreshed while there were no writes for
- // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber
- // and we will fail to update the cache even though the write was successful.
- _beginOrContinueTxn(lg, newTxnNumber, boost::none);
- }
+ if (newTxnNumber == _activeTxnNumber) {
+ for (const auto stmtId : stmtIdsWritten) {
+ if (stmtId == kIncompleteHistoryStmtId) {
+ _hasIncompleteHistory = true;
+ continue;
+ }
- if (newTxnNumber == _activeTxnNumber) {
- for (const auto stmtId : stmtIdsWritten) {
- if (stmtId == kIncompleteHistoryStmtId) {
- _hasIncompleteHistory = true;
- continue;
- }
-
- const auto insertRes =
- _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
- if (!insertRes.second) {
- const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(_sessionId,
- newTxnNumber,
- stmtId,
- existingOpTime,
- lastStmtIdWriteOpTime);
- }
+ const auto insertRes =
+ _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
+ if (!insertRes.second) {
+ const auto& existingOpTime = insertRes.first->second;
+ fassertOnRepeatedExecution(opCtx,
+ _sessionId,
+ newTxnNumber,
+ stmtId,
+ existingOpTime,
+ lastStmtIdWriteOpTime);
}
}
- });
+ }
+ });
MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) {
const auto& data = customArgs.getData();