diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-12-19 16:36:28 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-12-22 09:11:43 -0500 |
commit | b30c434c313dde1ded0bc0d73d264d27b1eafaa6 (patch) | |
tree | 9e356a7c465931dacca25e9da58db349c24dd344 /src/mongo/db | |
parent | 22d19f03b6ce14b9bc4cacd8c15faa1fa5f15ff2 (diff) | |
download | mongo-b30c434c313dde1ded0bc0d73d264d27b1eafaa6.tar.gz |
SERVER-38713 Get rid of transaction number equality checking from _makeUpdateRequest/_registerUpdateCacheOnCommit
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 8 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/oplog.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination_test.cpp | 32 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 115 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 16 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_retryable_writes_test.cpp | 39 |
8 files changed, 76 insertions, 143 deletions
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 524157fafa1..c67b51c8717 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -190,7 +190,7 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, const OplogUpdateEntryArgs& if (txnParticipant) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(); } OpTimeBundle opTimes; @@ -254,7 +254,7 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, if (txnParticipant) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(); } OpTimeBundle opTimes; @@ -940,7 +940,7 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); const auto txnParticipant = TransactionParticipant::get(opCtx); - oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(); // Until we support multiple oplog entries per transaction, prevOpTime should always be null. invariant(oplogLink.prevOpTime.isNull()); @@ -983,7 +983,7 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); const auto txnParticipant = TransactionParticipant::get(opCtx); - oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(); const StmtId stmtId(1); const auto wallClockTime = getWallClockTimeForOpLog(opCtx); diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index b802c28a301..08afeb96bd0 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -521,9 +521,9 @@ protected: const auto txnParticipant = TransactionParticipant::get(session()); if (!opTime.isNull()) { ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); - ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); + ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime()); } else { - ASSERT_EQ(txnRecord.getLastWriteOpTime(), txnParticipant->getLastWriteOpTime(txnNum)); + ASSERT_EQ(txnRecord.getLastWriteOpTime(), txnParticipant->getLastWriteOpTime()); } } diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index 3c8078cd4fc..0e244b9a0d0 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -551,8 +551,7 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, if (txnParticipant) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - - oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(); } auto timestamps = stdx::make_unique<Timestamp[]>(count); diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index 0e04d7462c6..0cbe33a8d74 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -277,7 +277,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); - oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(result.txnNum); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(); writeConflictRetry( opCtx, diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index e3d00084e90..a0817bad177 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -356,7 +356,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) { auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -418,7 +418,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, txnNum); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(txnNum)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -470,7 +470,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -539,7 +539,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) auto session = getSessionWithTxn(opCtx, sessionId1, 2); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); @@ -551,7 +551,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) auto session = getSessionWithTxn(opCtx, sessionId2, 42); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(42)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -615,7 +615,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) auto session = getSessionWithTxn(opCtx, sessionId, 2); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplog(oplog2, historyIter.next(opCtx)); @@ -666,7 +666,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA auto session = getSessionWithTxn(opCtx, sessionId, 2); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); auto nextOplog = historyIter.next(opCtx); @@ -755,7 +755,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind auto session = getSessionWithTxn(opCtx, sessionId, 2); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); auto nextOplog = historyIter.next(opCtx); @@ -847,7 +847,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify auto session = getSessionWithTxn(opCtx, sessionId, 2); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); auto nextOplog = historyIter.next(opCtx); @@ -947,7 +947,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { auto session = getSessionWithTxn(opCtx, sessionId, 20); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(20)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); auto oplog = historyIter.next(opCtx); ASSERT_BSONOBJ_EQ(BSON("_id" @@ -1009,7 +1009,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt auto session = getSessionWithTxn(opCtx, sessionId, 20); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(20)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); auto oplog = historyIter.next(opCtx); ASSERT_BSONOBJ_EQ(BSON("_id" @@ -1189,7 +1189,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, auto session = getSessionWithTxn(opCtx, sessionId, 2); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx)); @@ -1493,7 +1493,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem auto session = getSessionWithTxn(opCtx, sessionId, 19); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(19)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -1558,7 +1558,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory auto session = getSessionWithTxn(opCtx, *sessionInfo.getSessionId(), 2); const auto txnParticipant = TransactionParticipant::get(session.get()); - TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplogEntries[2], historyIter.next(opCtx)); @@ -1641,7 +1641,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, { auto session1 = getSessionWithTxn(opCtx, *sessionInfo1.getSessionId(), 3); const auto txnParticipant1 = TransactionParticipant::get(session1.get()); - ASSERT(txnParticipant1->getLastWriteOpTime(3).isNull()); + ASSERT(txnParticipant1->getLastWriteOpTime().isNull()); } // Check session 2 was correctly updated @@ -1649,7 +1649,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, auto session2 = getSessionWithTxn(opCtx, *sessionInfo2.getSessionId(), 15); const auto txnParticipant2 = TransactionParticipant::get(session2.get()); - TransactionHistoryIterator historyIter(txnParticipant2->getLastWriteOpTime(15)); + TransactionHistoryIterator historyIter(txnParticipant2->getLastWriteOpTime()); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplogEntries[2], historyIter.next(opCtx)); diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index aa01c5e0aa2..2dd2dba889f 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1590,6 +1590,7 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN } _activeTxnNumber = txnNumber; + _lastWriteOpTime = repl::OpTime(); // Reset the retryable writes state _resetRetryableWriteState(wl); @@ -1614,10 +1615,11 @@ void TransactionParticipant::refreshFromStorageIfNeeded() { stdx::lock_guard<stdx::mutex> lg(_mutex); - _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord); + const auto& lastTxnRecord = activeTxnHistory.lastTxnRecord; - if (_lastWrittenSessionRecord) { - _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum(); + if (lastTxnRecord) { + _activeTxnNumber = lastTxnRecord->getTxnNum(); + _lastWriteOpTime = lastTxnRecord->getLastWriteOpTime(); _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements); _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory; @@ -1654,15 +1656,14 @@ void TransactionParticipant::onWriteOpCompletedOnPrimary( } const auto updateRequest = - _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState); + _makeUpdateRequest(lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState); ul.unlock(); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); updateSessionEntry(opCtx, updateRequest); - _registerUpdateCacheOnCommit( - opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); + _registerUpdateCacheOnCommit(std::move(stmtIdsWritten), lastStmtIdWriteOpTime); } void TransactionParticipant::onMigrateCompletedOnPrimary(OperationContext* opCtx, @@ -1678,24 +1679,23 @@ void TransactionParticipant::onMigrateCompletedOnPrimary(OperationContext* opCtx _checkValid(ul); _checkIsActiveTransaction(ul, txnNumber); - // We do not migrate transaction oplog entries. - auto txnState = boost::none; - const auto updateRequest = _makeUpdateRequest( - ul, txnNumber, lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState); + // We do not migrate transaction oplog entries so don't set the txn state + const auto txnState = boost::none; + const auto updateRequest = + _makeUpdateRequest(lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState); ul.unlock(); repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); updateSessionEntry(opCtx, updateRequest); - _registerUpdateCacheOnCommit( - opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); + _registerUpdateCacheOnCommit(std::move(stmtIdsWritten), lastStmtIdWriteOpTime); } void TransactionParticipant::_invalidate(WithLock) { _isValid = false; _activeTxnNumber = kUninitializedTxnNumber; - _lastWrittenSessionRecord.reset(); + _lastWriteOpTime = repl::OpTime(); // Reset the transactions metrics. stdx::lock_guard<stdx::mutex> lm(_metricsMutex); @@ -1767,15 +1767,9 @@ void TransactionParticipant::abortPreparedTransactionForRollback() { _resetTransactionState(lg, TransactionState::kNone); } -repl::OpTime TransactionParticipant::getLastWriteOpTime(TxnNumber txnNumber) const { +repl::OpTime TransactionParticipant::getLastWriteOpTime() const { stdx::lock_guard<stdx::mutex> lg(_mutex); - _checkValid(lg); - _checkIsActiveTransaction(lg, txnNumber); - - if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) - return {}; - - return _lastWrittenSessionRecord->getLastWriteOpTime(); + return _lastWriteOpTime; } boost::optional<repl::OplogEntry> TransactionParticipant::checkStatementExecuted( @@ -1832,15 +1826,10 @@ boost::optional<repl::OpTime> TransactionParticipant::_checkStatementExecuted(St return boost::none; } - invariant(_lastWrittenSessionRecord); - invariant(_lastWrittenSessionRecord->getTxnNum() == _activeTxnNumber); - return it->second; } UpdateRequest TransactionParticipant::_makeUpdateRequest( - WithLock, - TxnNumber newTxnNumber, const repl::OpTime& newLastWriteOpTime, Date_t newLastWriteDate, boost::optional<DurableTxnStateEnum> newState) const { @@ -1849,7 +1838,7 @@ UpdateRequest TransactionParticipant::_makeUpdateRequest( const auto updateBSON = [&] { SessionTxnRecord newTxnRecord; newTxnRecord.setSessionId(_sessionId()); - newTxnRecord.setTxnNum(newTxnNumber); + newTxnRecord.setTxnNum(_activeTxnNumber); newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); newTxnRecord.setLastWriteDate(newLastWriteDate); newTxnRecord.setState(newState); @@ -1863,63 +1852,36 @@ UpdateRequest TransactionParticipant::_makeUpdateRequest( } void TransactionParticipant::_registerUpdateCacheOnCommit( - OperationContext* opCtx, - TxnNumber newTxnNumber, - std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime) { - opCtx->recoveryUnit()->onCommit( - [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ]( + std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime) { + _opCtx()->recoveryUnit()->onCommit( + [ this, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ]( boost::optional<Timestamp>) { + invariant(_isValid); + RetryableWritesStats::get(getGlobalServiceContext()) ->incrementTransactionsCollectionWriteCount(); stdx::lock_guard<stdx::mutex> lg(_mutex); - if (!_isValid) - return; - // The cache of the last written record must always be advanced after a write so that // subsequent writes have the correct point to start from. - if (!_lastWrittenSessionRecord) { - _lastWrittenSessionRecord.emplace(); - - _lastWrittenSessionRecord->setSessionId(_sessionId()); - _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } else { - if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) - _lastWrittenSessionRecord->setTxnNum(newTxnNumber); + _lastWriteOpTime = lastStmtIdWriteOpTime; - if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) - _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } - - if (newTxnNumber > _activeTxnNumber) { - // This call is necessary in order to advance the txn number and reset the cached - // state in the case where just before the storage transaction commits, the cache - // entry gets invalidated and immediately refreshed while there were no writes for - // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber - // and we will fail to update the cache even though the write was successful. - _beginOrContinueRetryableWrite(lg, newTxnNumber); - } + for (const auto stmtId : stmtIdsWritten) { + if (stmtId == kIncompleteHistoryStmtId) { + _hasIncompleteHistory = true; + continue; + } - if (newTxnNumber == _activeTxnNumber) { - for (const auto stmtId : stmtIdsWritten) { - if (stmtId == kIncompleteHistoryStmtId) { - _hasIncompleteHistory = true; - continue; - } - - const auto insertRes = - _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); - if (!insertRes.second) { - const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(_sessionId(), - newTxnNumber, - stmtId, - existingOpTime, - lastStmtIdWriteOpTime); - } + const auto insertRes = + _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); + if (!insertRes.second) { + const auto& existingOpTime = insertRes.first->second; + fassertOnRepeatedExecution(_sessionId(), + _activeTxnNumber, + stmtId, + existingOpTime, + lastStmtIdWriteOpTime); } } }); @@ -1929,14 +1891,15 @@ void TransactionParticipant::_registerUpdateCacheOnCommit( const auto closeConnectionElem = data["closeConnection"]; if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) { - opCtx->getClient()->session()->end(); + _opCtx()->getClient()->session()->end(); } const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"]; if (!failBeforeCommitExceptionElem.eoo()) { const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number())); uasserted(failureCode, - str::stream() << "Failing write for " << _sessionId() << ":" << newTxnNumber + str::stream() << "Failing write for " << _sessionId() << ":" + << _activeTxnNumber << " due to failpoint. The write must not be reflected."); } } diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 234e27f2b44..b8ac97b7e5f 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -395,7 +395,7 @@ public: * * Throws if the session has been invalidated or the active transaction number doesn't match. */ - repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const; + repl::OpTime getLastWriteOpTime() const; /** * Returns the prepare op time that was selected for the transaction, which can be Null if the @@ -644,15 +644,11 @@ private: boost::optional<repl::OpTime> _checkStatementExecuted(StmtId stmtId) const; - UpdateRequest _makeUpdateRequest(WithLock, - TxnNumber newTxnNumber, - const repl::OpTime& newLastWriteOpTime, + UpdateRequest _makeUpdateRequest(const repl::OpTime& newLastWriteOpTime, Date_t newLastWriteDate, boost::optional<DurableTxnStateEnum> newState) const; - void _registerUpdateCacheOnCommit(OperationContext* opCtx, - TxnNumber newTxnNumber, - std::vector<StmtId> stmtIdsWritten, + void _registerUpdateCacheOnCommit(std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteTs); // Called for speculative transactions to fix the optime of the snapshot to read from. @@ -772,6 +768,9 @@ private: // means a new transaction has begun on the session, but it hasn't yet performed any writes. TxnNumber _activeTxnNumber{kUninitializedTxnNumber}; + // Caches what is known to be the last optime written for the active transaction. + repl::OpTime _lastWriteOpTime; + // Set when a snapshot read / transaction begins. Alleviates cache pressure by limiting how long // a snapshot will remain open and available. Checked in combination with _txnState to determine // whether the transaction should be aborted. @@ -827,9 +826,6 @@ private: // truncated because it was too old. bool _hasIncompleteHistory{false}; - // Caches what is known to be the last written transaction record for the session - boost::optional<SessionTxnRecord> _lastWrittenSessionRecord; - // For the active txn, tracks which statement ids have been committed and at which oplog // opTime. Used for fast retryability check and retrieving the previous write's data without // having to scan through the oplog. diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 638f931ef20..0a7f938b391 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -242,11 +242,11 @@ protected: txnRecordObj.hasField(SessionTxnRecord::kStateFieldName)); const auto txnParticipant = TransactionParticipant::get(session); - ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); + ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime()); txnParticipant->invalidate(); txnParticipant->refreshFromStorageIfNeeded(); - ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); + ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime()); } private: @@ -260,7 +260,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) const TxnNumber txnNum = 20; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull()); + ASSERT(txnParticipant->getLastWriteOpTime().isNull()); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, @@ -292,7 +292,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit ASSERT_EQ(txnNum, txnRecord.getTxnNum()); ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); ASSERT(!txnRecord.getState()); - ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); + ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime()); } TEST_F(TransactionParticipantRetryableWritesTest, @@ -318,11 +318,11 @@ TEST_F(TransactionParticipantRetryableWritesTest, ASSERT_EQ(200, txnRecord.getTxnNum()); ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime()); ASSERT(!txnRecord.getState()); - ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200)); + ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime()); txnParticipant->invalidate(); txnParticipant->refreshFromStorageIfNeeded(); - ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200)); + ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime()); } TEST_F(TransactionParticipantRetryableWritesTest, TransactionTableUpdatesReplaceEntireDocument) { @@ -349,7 +349,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) { ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(txnNum - 1, boost::none, boost::none), AssertionException, ErrorCodes::TransactionTooOld); - ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull()); + ASSERT(txnParticipant->getLastWriteOpTime().isNull()); } TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionNotDefaultCreated) { @@ -464,31 +464,6 @@ DEATH_TEST_F(TransactionParticipantRetryableWritesTest, opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); } -TEST_F(TransactionParticipantRetryableWritesTest, - WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { - const auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant->refreshFromStorageIfNeeded(); - - const TxnNumber txnNum = 100; - txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - - { - AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); - WriteUnitOfWork wuow(opCtx()); - const auto uuid = UUID::gen(); - const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0); - txnParticipant->onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); - - txnParticipant->invalidate(); - - wuow.commit(); - } - - txnParticipant->refreshFromStorageIfNeeded(); - ASSERT(txnParticipant->checkStatementExecuted(0)); -} - TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) { const auto sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 2; |