summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/db/op_observer_impl.cpp8
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp4
-rw-r--r--src/mongo/db/repl/oplog.cpp3
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp32
-rw-r--r--src/mongo/db/transaction_participant.cpp115
-rw-r--r--src/mongo/db/transaction_participant.h16
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp39
8 files changed, 76 insertions, 143 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 524157fafa1..c67b51c8717 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -190,7 +190,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs&
if (txnParticipant) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime();
}
OpTimeBundle opTimes;
@@ -254,7 +254,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
if (txnParticipant) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime();
}
OpTimeBundle opTimes;
@@ -940,7 +940,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
const auto txnParticipant = TransactionParticipant::get(opCtx);
- oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime();
// Until we support multiple oplog entries per transaction, prevOpTime should always be null.
invariant(oplogLink.prevOpTime.isNull());
@@ -983,7 +983,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
const auto txnParticipant = TransactionParticipant::get(opCtx);
- oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime();
const StmtId stmtId(1);
const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index b802c28a301..08afeb96bd0 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -521,9 +521,9 @@ protected:
const auto txnParticipant = TransactionParticipant::get(session());
if (!opTime.isNull()) {
ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
- ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
+ ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime());
} else {
- ASSERT_EQ(txnRecord.getLastWriteOpTime(), txnParticipant->getLastWriteOpTime(txnNum));
+ ASSERT_EQ(txnRecord.getLastWriteOpTime(), txnParticipant->getLastWriteOpTime());
}
}
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index 3c8078cd4fc..0e244b9a0d0 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -551,8 +551,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
if (txnParticipant) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
-
- oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime();
}
auto timestamps = stdx::make_unique<Timestamp[]>(count);
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index 0e04d7462c6..0cbe33a8d74 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -277,7 +277,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
- oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(result.txnNum);
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime();
writeConflictRetry(
opCtx,
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index e3d00084e90..a0817bad177 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -356,7 +356,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -418,7 +418,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, txnNum);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(txnNum));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -470,7 +470,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -539,7 +539,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
auto session = getSessionWithTxn(opCtx, sessionId1, 2);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
@@ -551,7 +551,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
auto session = getSessionWithTxn(opCtx, sessionId2, 42);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(42));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -615,7 +615,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
auto session = getSessionWithTxn(opCtx, sessionId, 2);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplog(oplog2, historyIter.next(opCtx));
@@ -666,7 +666,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
auto session = getSessionWithTxn(opCtx, sessionId, 2);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
auto nextOplog = historyIter.next(opCtx);
@@ -755,7 +755,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
auto session = getSessionWithTxn(opCtx, sessionId, 2);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
auto nextOplog = historyIter.next(opCtx);
@@ -847,7 +847,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
auto session = getSessionWithTxn(opCtx, sessionId, 2);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
auto nextOplog = historyIter.next(opCtx);
@@ -947,7 +947,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
auto session = getSessionWithTxn(opCtx, sessionId, 20);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(20));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
auto oplog = historyIter.next(opCtx);
ASSERT_BSONOBJ_EQ(BSON("_id"
@@ -1009,7 +1009,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
auto session = getSessionWithTxn(opCtx, sessionId, 20);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(20));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
auto oplog = historyIter.next(opCtx);
ASSERT_BSONOBJ_EQ(BSON("_id"
@@ -1189,7 +1189,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
auto session = getSessionWithTxn(opCtx, sessionId, 2);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
@@ -1493,7 +1493,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
auto session = getSessionWithTxn(opCtx, sessionId, 19);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(19));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -1558,7 +1558,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
auto session = getSessionWithTxn(opCtx, *sessionInfo.getSessionId(), 2);
const auto txnParticipant = TransactionParticipant::get(session.get());
- TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplogEntries[2], historyIter.next(opCtx));
@@ -1641,7 +1641,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
{
auto session1 = getSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), 3);
const auto txnParticipant1 = TransactionParticipant::get(session1.get());
- ASSERT(txnParticipant1->getLastWriteOpTime(3).isNull());
+ ASSERT(txnParticipant1->getLastWriteOpTime().isNull());
}
// Check session 2 was correctly updated
@@ -1649,7 +1649,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
auto session2 = getSessionWithTxn(opCtx, *sessionInfo2.getSessionId(), 15);
const auto txnParticipant2 = TransactionParticipant::get(session2.get());
- TransactionHistoryIterator historyIter(txnParticipant2->getLastWriteOpTime(15));
+ TransactionHistoryIterator historyIter(txnParticipant2->getLastWriteOpTime());
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplogEntries[2], historyIter.next(opCtx));
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index aa01c5e0aa2..2dd2dba889f 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -1590,6 +1590,7 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN
}
_activeTxnNumber = txnNumber;
+ _lastWriteOpTime = repl::OpTime();
// Reset the retryable writes state
_resetRetryableWriteState(wl);
@@ -1614,10 +1615,11 @@ void TransactionParticipant::refreshFromStorageIfNeeded() {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord);
+ const auto& lastTxnRecord = activeTxnHistory.lastTxnRecord;
- if (_lastWrittenSessionRecord) {
- _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum();
+ if (lastTxnRecord) {
+ _activeTxnNumber = lastTxnRecord->getTxnNum();
+ _lastWriteOpTime = lastTxnRecord->getLastWriteOpTime();
_activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements);
_hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory;
@@ -1654,15 +1656,14 @@ void TransactionParticipant::onWriteOpCompletedOnPrimary(
}
const auto updateRequest =
- _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState);
+ _makeUpdateRequest(lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState);
ul.unlock();
repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
updateSessionEntry(opCtx, updateRequest);
- _registerUpdateCacheOnCommit(
- opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
+ _registerUpdateCacheOnCommit(std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
}
void TransactionParticipant::onMigrateCompletedOnPrimary(OperationContext* opCtx,
@@ -1678,24 +1679,23 @@ void TransactionParticipant::onMigrateCompletedOnPrimary(OperationContext* opCtx
_checkValid(ul);
_checkIsActiveTransaction(ul, txnNumber);
- // We do not migrate transaction oplog entries.
- auto txnState = boost::none;
- const auto updateRequest = _makeUpdateRequest(
- ul, txnNumber, lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState);
+ // We do not migrate transaction oplog entries so don't set the txn state
+ const auto txnState = boost::none;
+ const auto updateRequest =
+ _makeUpdateRequest(lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState);
ul.unlock();
repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
updateSessionEntry(opCtx, updateRequest);
- _registerUpdateCacheOnCommit(
- opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
+ _registerUpdateCacheOnCommit(std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
}
void TransactionParticipant::_invalidate(WithLock) {
_isValid = false;
_activeTxnNumber = kUninitializedTxnNumber;
- _lastWrittenSessionRecord.reset();
+ _lastWriteOpTime = repl::OpTime();
// Reset the transactions metrics.
stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
@@ -1767,15 +1767,9 @@ void TransactionParticipant::abortPreparedTransactionForRollback() {
_resetTransactionState(lg, TransactionState::kNone);
}
-repl::OpTime TransactionParticipant::getLastWriteOpTime(TxnNumber txnNumber) const {
+repl::OpTime TransactionParticipant::getLastWriteOpTime() const {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- _checkValid(lg);
- _checkIsActiveTransaction(lg, txnNumber);
-
- if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
- return {};
-
- return _lastWrittenSessionRecord->getLastWriteOpTime();
+ return _lastWriteOpTime;
}
boost::optional<repl::OplogEntry> TransactionParticipant::checkStatementExecuted(
@@ -1832,15 +1826,10 @@ boost::optional<repl::OpTime> TransactionParticipant::_checkStatementExecuted(St
return boost::none;
}
- invariant(_lastWrittenSessionRecord);
- invariant(_lastWrittenSessionRecord->getTxnNum() == _activeTxnNumber);
-
return it->second;
}
UpdateRequest TransactionParticipant::_makeUpdateRequest(
- WithLock,
- TxnNumber newTxnNumber,
const repl::OpTime& newLastWriteOpTime,
Date_t newLastWriteDate,
boost::optional<DurableTxnStateEnum> newState) const {
@@ -1849,7 +1838,7 @@ UpdateRequest TransactionParticipant::_makeUpdateRequest(
const auto updateBSON = [&] {
SessionTxnRecord newTxnRecord;
newTxnRecord.setSessionId(_sessionId());
- newTxnRecord.setTxnNum(newTxnNumber);
+ newTxnRecord.setTxnNum(_activeTxnNumber);
newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
newTxnRecord.setLastWriteDate(newLastWriteDate);
newTxnRecord.setState(newState);
@@ -1863,63 +1852,36 @@ UpdateRequest TransactionParticipant::_makeUpdateRequest(
}
void TransactionParticipant::_registerUpdateCacheOnCommit(
- OperationContext* opCtx,
- TxnNumber newTxnNumber,
- std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime) {
- opCtx->recoveryUnit()->onCommit(
- [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ](
+ std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) {
+ _opCtx()->recoveryUnit()->onCommit(
+ [ this, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ](
boost::optional<Timestamp>) {
+ invariant(_isValid);
+
RetryableWritesStats::get(getGlobalServiceContext())
->incrementTransactionsCollectionWriteCount();
stdx::lock_guard<stdx::mutex> lg(_mutex);
- if (!_isValid)
- return;
-
// The cache of the last written record must always be advanced after a write so that
// subsequent writes have the correct point to start from.
- if (!_lastWrittenSessionRecord) {
- _lastWrittenSessionRecord.emplace();
-
- _lastWrittenSessionRecord->setSessionId(_sessionId());
- _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
- _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
- } else {
- if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
- _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
+ _lastWriteOpTime = lastStmtIdWriteOpTime;
- if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
- _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
- }
-
- if (newTxnNumber > _activeTxnNumber) {
- // This call is necessary in order to advance the txn number and reset the cached
- // state in the case where just before the storage transaction commits, the cache
- // entry gets invalidated and immediately refreshed while there were no writes for
- // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber
- // and we will fail to update the cache even though the write was successful.
- _beginOrContinueRetryableWrite(lg, newTxnNumber);
- }
+ for (const auto stmtId : stmtIdsWritten) {
+ if (stmtId == kIncompleteHistoryStmtId) {
+ _hasIncompleteHistory = true;
+ continue;
+ }
- if (newTxnNumber == _activeTxnNumber) {
- for (const auto stmtId : stmtIdsWritten) {
- if (stmtId == kIncompleteHistoryStmtId) {
- _hasIncompleteHistory = true;
- continue;
- }
-
- const auto insertRes =
- _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
- if (!insertRes.second) {
- const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(_sessionId(),
- newTxnNumber,
- stmtId,
- existingOpTime,
- lastStmtIdWriteOpTime);
- }
+ const auto insertRes =
+ _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
+ if (!insertRes.second) {
+ const auto& existingOpTime = insertRes.first->second;
+ fassertOnRepeatedExecution(_sessionId(),
+ _activeTxnNumber,
+ stmtId,
+ existingOpTime,
+ lastStmtIdWriteOpTime);
}
}
});
@@ -1929,14 +1891,15 @@ void TransactionParticipant::_registerUpdateCacheOnCommit(
const auto closeConnectionElem = data["closeConnection"];
if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) {
- opCtx->getClient()->session()->end();
+ _opCtx()->getClient()->session()->end();
}
const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"];
if (!failBeforeCommitExceptionElem.eoo()) {
const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number()));
uasserted(failureCode,
- str::stream() << "Failing write for " << _sessionId() << ":" << newTxnNumber
+ str::stream() << "Failing write for " << _sessionId() << ":"
+ << _activeTxnNumber
<< " due to failpoint. The write must not be reflected.");
}
}
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 234e27f2b44..b8ac97b7e5f 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -395,7 +395,7 @@ public:
*
* Throws if the session has been invalidated or the active transaction number doesn't match.
*/
- repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const;
+ repl::OpTime getLastWriteOpTime() const;
/**
* Returns the prepare op time that was selected for the transaction, which can be Null if the
@@ -644,15 +644,11 @@ private:
boost::optional<repl::OpTime> _checkStatementExecuted(StmtId stmtId) const;
- UpdateRequest _makeUpdateRequest(WithLock,
- TxnNumber newTxnNumber,
- const repl::OpTime& newLastWriteOpTime,
+ UpdateRequest _makeUpdateRequest(const repl::OpTime& newLastWriteOpTime,
Date_t newLastWriteDate,
boost::optional<DurableTxnStateEnum> newState) const;
- void _registerUpdateCacheOnCommit(OperationContext* opCtx,
- TxnNumber newTxnNumber,
- std::vector<StmtId> stmtIdsWritten,
+ void _registerUpdateCacheOnCommit(std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteTs);
// Called for speculative transactions to fix the optime of the snapshot to read from.
@@ -772,6 +768,9 @@ private:
// means a new transaction has begun on the session, but it hasn't yet performed any writes.
TxnNumber _activeTxnNumber{kUninitializedTxnNumber};
+ // Caches what is known to be the last optime written for the active transaction.
+ repl::OpTime _lastWriteOpTime;
+
// Set when a snapshot read / transaction begins. Alleviates cache pressure by limiting how long
// a snapshot will remain open and available. Checked in combination with _txnState to determine
// whether the transaction should be aborted.
@@ -827,9 +826,6 @@ private:
// truncated because it was too old.
bool _hasIncompleteHistory{false};
- // Caches what is known to be the last written transaction record for the session
- boost::optional<SessionTxnRecord> _lastWrittenSessionRecord;
-
// For the active txn, tracks which statement ids have been committed and at which oplog
// opTime. Used for fast retryability check and retrieving the previous write's data without
// having to scan through the oplog.
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index 638f931ef20..0a7f938b391 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -242,11 +242,11 @@ protected:
txnRecordObj.hasField(SessionTxnRecord::kStateFieldName));
const auto txnParticipant = TransactionParticipant::get(session);
- ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
+ ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime());
txnParticipant->invalidate();
txnParticipant->refreshFromStorageIfNeeded();
- ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
+ ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime());
}
private:
@@ -260,7 +260,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin)
const TxnNumber txnNum = 20;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
- ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull());
+ ASSERT(txnParticipant->getLastWriteOpTime().isNull());
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
@@ -292,7 +292,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit
ASSERT_EQ(txnNum, txnRecord.getTxnNum());
ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
ASSERT(!txnRecord.getState());
- ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
+ ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime());
}
TEST_F(TransactionParticipantRetryableWritesTest,
@@ -318,11 +318,11 @@ TEST_F(TransactionParticipantRetryableWritesTest,
ASSERT_EQ(200, txnRecord.getTxnNum());
ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime());
ASSERT(!txnRecord.getState());
- ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200));
+ ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime());
txnParticipant->invalidate();
txnParticipant->refreshFromStorageIfNeeded();
- ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200));
+ ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime());
}
TEST_F(TransactionParticipantRetryableWritesTest, TransactionTableUpdatesReplaceEntireDocument) {
@@ -349,7 +349,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) {
ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(txnNum - 1, boost::none, boost::none),
AssertionException,
ErrorCodes::TransactionTooOld);
- ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull());
+ ASSERT(txnParticipant->getLastWriteOpTime().isNull());
}
TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionNotDefaultCreated) {
@@ -464,31 +464,6 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest,
opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
}
-TEST_F(TransactionParticipantRetryableWritesTest,
- WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
- const auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant->refreshFromStorageIfNeeded();
-
- const TxnNumber txnNum = 100;
- txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
-
- {
- AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
- WriteUnitOfWork wuow(opCtx());
- const auto uuid = UUID::gen();
- const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0);
- txnParticipant->onWriteOpCompletedOnPrimary(
- opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
-
- txnParticipant->invalidate();
-
- wuow.commit();
- }
-
- txnParticipant->refreshFromStorageIfNeeded();
- ASSERT(txnParticipant->checkStatementExecuted(0));
-}
-
TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) {
const auto sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 2;