diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-09-27 06:05:54 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-10-08 02:38:48 -0400 |
commit | b272bf351c39677d1e87d5c7fcd8b15b61465012 (patch) | |
tree | 9a92c09de2c9eb4244ca4b97d320f9d1e70637af /src/mongo/db | |
parent | 07066a49b935a538ed54716fdd9a98d40c31fba4 (diff) | |
download | mongo-b272bf351c39677d1e87d5c7fcd8b15b61465012.tar.gz |
SERVER-36799 Move all transactions and retryable writes functionality from Session into TransactionParticipant
This change leaves the Session class to be a plain decorable structure
only used for serialization of operations on the same logical session.
Diffstat (limited to 'src/mongo/db')
23 files changed, 1172 insertions, 1304 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 8f803deac3a..50b21051fdc 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1962,13 +1962,12 @@ env.CppUnitTest( target='sessions_test', source=[ 'session_catalog_test.cpp', - 'session_test.cpp', ], LIBDEPS=[ - 'query_exec', + '$BUILD_DIR/mongo/client/read_preference', '$BUILD_DIR/mongo/db/auth/authmocks', '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', - '$BUILD_DIR/mongo/client/read_preference' + 'query_exec', ], ) @@ -1976,6 +1975,7 @@ env.CppUnitTest( target='transaction_participant_test', source=[ 'transaction_participant_test.cpp', + 'transaction_participant_retryable_writes_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/client/read_preference', diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp index b56ad230b4d..c87fdf830ef 100644 --- a/src/mongo/db/commands/find_and_modify.cpp +++ b/src/mongo/db/commands/find_and_modify.cpp @@ -352,9 +352,9 @@ public: const auto stmtId = 0; if (opCtx->getTxnNumber() && !inTransaction) { - auto session = OperationContextSession::get(opCtx); + const auto txnParticipant = TransactionParticipant::get(opCtx); if (auto entry = - session->checkStatementExecuted(opCtx, *opCtx->getTxnNumber(), stmtId)) { + txnParticipant->checkStatementExecuted(opCtx, *opCtx->getTxnNumber(), stmtId)) { RetryableWritesStats::get(opCtx)->incrementRetriedCommandsCount(); RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); parseOplogEntryForFindAndModify(opCtx, args, *entry, &result); diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index 3c21adbe3fa..34e52fd95ab 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -116,12 +116,13 @@ void onWriteOpCompleted(OperationContext* opCtx, return; if (session) { - session->onWriteOpCompletedOnPrimary(opCtx, - *opCtx->getTxnNumber(), - std::move(stmtIdsWritten), - lastStmtIdWriteOpTime, - lastStmtIdWriteDate, - txnState); + const auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant->onWriteOpCompletedOnPrimary(opCtx, + *opCtx->getTxnNumber(), + std::move(stmtIdsWritten), + lastStmtIdWriteOpTime, + lastStmtIdWriteDate, + txnState); } } @@ -186,7 +187,9 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx, if (session) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); + + const auto txnParticipant = TransactionParticipant::get(opCtx); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); } OpTimeBundle opTimes; @@ -250,7 +253,9 @@ OpTimeBundle replLogDelete(OperationContext* opCtx, if (session) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); + + const auto txnParticipant = TransactionParticipant::get(opCtx); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); } OpTimeBundle opTimes; @@ -934,8 +939,9 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, repl::OplogLink oplogLink; sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - StmtId stmtId(0); - oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); + + const auto txnParticipant = TransactionParticipant::get(opCtx); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); // Until we support multiple oplog entries per transaction, prevOpTime should always be null. invariant(oplogLink.prevOpTime.isNull()); @@ -947,6 +953,8 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx, applyOpsBuilder.append("prepare", true); } auto applyOpCmd = applyOpsBuilder.done(); + const StmtId stmtId(0); + auto times = replLogApplyOps( opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot); @@ -969,14 +977,15 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx, const OplogSlot& oplogSlot, const BSONObj& objectField, DurableTxnStateEnum durableState) { - invariant(session->isLockedTxnNumber(*opCtx->getTxnNumber())); const NamespaceString cmdNss{"admin", "$cmd"}; OperationSessionInfo sessionInfo; repl::OplogLink oplogLink; sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); + + const auto txnParticipant = TransactionParticipant::get(opCtx); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); const StmtId stmtId(1); const auto wallClockTime = getWallClockTimeForOpLog(opCtx); diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index d9c72a3b135..c5d202cee71 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -1,30 +1,30 @@ /** -* Copyright (C) 2017 MongoDB Inc. -* -* This program is free software: you can redistribute it and/or modify -* it under the terms of the GNU Affero General Public License, version 3, -* as published by the Free Software Foundation. -* -* This program is distributed in the hope that it will be useful, -* but WITHOUT ANY WARRANTY; without even the implied warranty of -* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -* GNU Affero General Public License for more details. -* -* You should have received a copy of the GNU Affero General Public License -* along with this program. If not, see <http://www.gnu.org/licenses/>. -* -* As a special exception, the copyright holders give permission to link the -* code of portions of this program with the OpenSSL library under certain -* conditions as described in each individual source file and distribute -* linked combinations including the program with the OpenSSL library. You -* must comply with the GNU Affero General Public License in all respects for -* all of the code used other than as permitted herein. If you modify file(s) -* with this exception, you may extend this exception to your version of the -* file(s), but you are not obligated to do so. If you do not wish to do so, -* delete this exception statement from your version. If you delete this -* exception statement from all source files in the program, then also delete -* it in the license file. -*/ + * Copyright (C) 2017 MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ #include "mongo/db/op_observer_impl.h" @@ -311,17 +311,17 @@ public: * statement id. */ void simulateSessionWrite(OperationContext* opCtx, - ScopedSession session, + TransactionParticipant* txnParticipant, NamespaceString nss, TxnNumber txnNum, StmtId stmtId) { - session->beginOrContinueTxn(opCtx, txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); { AutoGetCollection autoColl(opCtx, nss, MODE_IX); WriteUnitOfWork wuow(opCtx); auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp. - session->onWriteOpCompletedOnPrimary( + txnParticipant->onWriteOpCompletedOnPrimary( opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none); wuow.commit(); } @@ -337,22 +337,23 @@ TEST_F(OpObserverSessionCatalogTest, OnRollbackInvalidatesSessionCatalogIfSessio auto sessionCatalog = SessionCatalog::get(getServiceContext()); auto sessionId = makeLogicalSessionIdForTest(); auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId); - session->refreshFromStorageIfNeeded(opCtx.get()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + txnParticipant->refreshFromStorageIfNeeded(opCtx.get()); // Simulate a write occurring on that session. const TxnNumber txnNum = 0; const StmtId stmtId = 1000; - simulateSessionWrite(opCtx.get(), session, nss, txnNum, stmtId); + simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId); // Check that the statement executed. - ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); // The OpObserver should invalidate in-memory session state, so the check after this should // fail. OpObserver::RollbackObserverInfo rbInfo; rbInfo.rollbackSessionIds = {UUID::gen()}; opObserver.onReplicationRollback(opCtx.get(), rbInfo); - ASSERT_THROWS_CODE(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId), + ASSERT_THROWS_CODE(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId), DBException, ErrorCodes::ConflictingOperationInProgress); } @@ -367,21 +368,22 @@ TEST_F(OpObserverSessionCatalogTest, auto sessionCatalog = SessionCatalog::get(getServiceContext()); auto sessionId = makeLogicalSessionIdForTest(); auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId); - session->refreshFromStorageIfNeeded(opCtx.get()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + txnParticipant->refreshFromStorageIfNeeded(opCtx.get()); // Simulate a write occurring on that session. const TxnNumber txnNum = 0; const StmtId stmtId = 1000; - simulateSessionWrite(opCtx.get(), session, nss, txnNum, stmtId); + simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId); // Check that the statement executed. - ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); // The OpObserver should not invalidate the in-memory session state, so the check after this // should still succeed. OpObserver::RollbackObserverInfo rbInfo; opObserver.onReplicationRollback(opCtx.get(), rbInfo); - ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); } /** @@ -504,7 +506,10 @@ public: auto sessionCatalog = SessionCatalog::get(getServiceContext()); auto sessionId = makeLogicalSessionIdForTest(); _session = sessionCatalog->getOrCreateSession(opCtx(), sessionId); - _session->get()->refreshFromStorageIfNeeded(opCtx()); + + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session()); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); + opCtx()->setLogicalSessionId(sessionId); _opObserver.emplace(); _times.emplace(opCtx()); @@ -556,17 +561,13 @@ protected: ASSERT(txnRecord.getState() == txnState); ASSERT_EQ(txnState != boost::none, txnRecordObj.hasField(SessionTxnRecord::kStateFieldName)); + + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session()); if (!opTime.isNull()) { ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); - ASSERT_EQ(opTime, session()->getLastWriteOpTime(txnNum)); + ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); } else { - ASSERT_EQ(txnRecord.getLastWriteOpTime(), session()->getLastWriteOpTime(txnNum)); - } - - session()->invalidate(); - session()->refreshFromStorageIfNeeded(opCtx()); - if (!opTime.isNull()) { - ASSERT_EQ(opTime, session()->getLastWriteOpTime(txnNum)); + ASSERT_EQ(txnRecord.getLastWriteOpTime(), txnParticipant->getLastWriteOpTime(txnNum)); } } @@ -715,8 +716,6 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) { const auto prepareSlot = repl::getNextOpTime(opCtx()); prepareTimestamp = prepareSlot.opTime.getTimestamp(); opObserver().onTransactionPrepare(opCtx(), prepareSlot); - session()->lockTxnNumber(txnNum, - Status(ErrorCodes::OperationFailed, "unittest lock failed")); commitSlot = repl::getNextOpTime(opCtx()); } @@ -785,9 +784,6 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) { txnParticipant->transitionToPreparedforTest(); const auto prepareSlot = repl::getNextOpTime(opCtx()); opObserver().onTransactionPrepare(opCtx(), prepareSlot); - session()->lockTxnNumber(txnNum, - Status(ErrorCodes::OperationFailed, "unittest lock failed")); - abortSlot = repl::getNextOpTime(opCtx()); } @@ -944,10 +940,6 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction opObserver().onTransactionPrepare(opCtx(), slot); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); txnParticipant->transitionToPreparedforTest(); - session()->lockTxnNumber(txnNum, - {ErrorCodes::PreparedTransactionInProgress, - "unittest mock prepare transaction number lock"}); - abortSlot = repl::getNextOpTime(opCtx()); } @@ -957,7 +949,6 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction opObserver().onTransactionAbort(opCtx(), abortSlot); txnParticipant->transitionToAbortedforTest(); - session()->unlockTxnNumber(); txnParticipant->stashTransactionResources(opCtx()); // Abort the storage-transaction without calling the OpObserver. @@ -1027,9 +1018,6 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti opObserver().onTransactionPrepare(opCtx(), slot); opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp()); txnParticipant->transitionToPreparedforTest(); - session()->lockTxnNumber(txnNum, - {ErrorCodes::PreparedTransactionInProgress, - "unittest mock prepare transaction number lock"}); } OplogSlot commitSlot = repl::getNextOpTime(opCtx()); @@ -1041,8 +1029,6 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti opCtx()->lockState()->unsetMaxLockTimeout(); opObserver().onTransactionCommit(opCtx(), commitSlot, prepareOpTime.getTimestamp()); - session()->unlockTxnNumber(); - assertTxnRecord(txnNum, commitOpTime, DurableTxnStateEnum::kCommitted); } diff --git a/src/mongo/db/operation_context_session_mongod.cpp b/src/mongo/db/operation_context_session_mongod.cpp index 0e79480b31c..bcb7a889cd3 100644 --- a/src/mongo/db/operation_context_session_mongod.cpp +++ b/src/mongo/db/operation_context_session_mongod.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#include "mongo/platform/basic.h" + #include "mongo/db/operation_context_session_mongod.h" #include "mongo/db/transaction_coordinator_factory.h" @@ -40,12 +42,10 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o boost::optional<bool> coordinator) : _operationContextSession(opCtx, shouldCheckOutSession) { if (shouldCheckOutSession && !opCtx->getClient()->isInDirectClient()) { - auto session = OperationContextSession::get(opCtx); - invariant(session); + const auto txnParticipant = TransactionParticipant::get(opCtx); + txnParticipant->refreshFromStorageIfNeeded(opCtx); - auto clientTxnNumber = *opCtx->getTxnNumber(); - session->refreshFromStorageIfNeeded(opCtx); - session->beginOrContinueTxn(opCtx, clientTxnNumber); + const auto clientTxnNumber = *opCtx->getTxnNumber(); if (startTransaction && *startTransaction) { // If this shard has been selected as the coordinator, set up the coordinator state @@ -55,7 +55,6 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o } } - auto txnParticipant = TransactionParticipant::get(opCtx); txnParticipant->beginOrContinue(clientTxnNumber, autocommit, startTransaction); } } @@ -64,16 +63,9 @@ OperationContextSessionMongodWithoutRefresh::OperationContextSessionMongodWithou OperationContext* opCtx) : _operationContextSession(opCtx, true /* checkout */) { invariant(!opCtx->getClient()->isInDirectClient()); - auto session = OperationContextSession::get(opCtx); - invariant(session); - - auto clientTxnNumber = *opCtx->getTxnNumber(); - // Session is refreshed, but the transaction participant isn't. - session->refreshFromStorageIfNeeded(opCtx); - session->beginOrContinueTxn(opCtx, clientTxnNumber); + const auto clientTxnNumber = *opCtx->getTxnNumber(); - auto txnParticipant = TransactionParticipant::get(opCtx); - invariant(txnParticipant); + const auto txnParticipant = TransactionParticipant::get(opCtx); txnParticipant->beginOrContinueTransactionUnconditionally(clientTxnNumber); } diff --git a/src/mongo/db/operation_context_session_mongod.h b/src/mongo/db/operation_context_session_mongod.h index 9c0a13e4e0e..f455a7d2ad0 100644 --- a/src/mongo/db/operation_context_session_mongod.h +++ b/src/mongo/db/operation_context_session_mongod.h @@ -54,11 +54,11 @@ private: }; /** - * Similar to OperationContextSessionMongod, but this starts a new transaction unconditionally - * without refreshing the state from disk. The session reloads the state from disk but - * the transaction participant will not use the on-disk state to refresh its in-memory state. + * Similar to OperationContextSessionMongod, but marks the TransactionParticipant as valid without + * refreshing from disk and starts a new transaction unconditionally. * - * This is used for transaction secondary application and recovery. + * NOTE: Only used by the replication oplog application logic on secondaries in order to replay + * prepared transactions. */ class OperationContextSessionMongodWithoutRefresh { public: diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp index d9dc53085b4..5a592da38a9 100644 --- a/src/mongo/db/ops/write_ops_exec.cpp +++ b/src/mongo/db/ops/write_ops_exec.cpp @@ -511,11 +511,9 @@ WriteResult performInserts(OperationContext* opCtx, } else { const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); if (opCtx->getTxnNumber()) { - auto session = OperationContextSession::get(opCtx); - invariant(session); if (!txnParticipant->inMultiDocumentTransaction() && - session->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(), - stmtId)) { + txnParticipant->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(), + stmtId)) { containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry()); @@ -678,10 +676,9 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who for (auto&& singleOp : wholeOp.getUpdates()) { const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); if (opCtx->getTxnNumber()) { - auto session = OperationContextSession::get(opCtx); if (!txnParticipant->inMultiDocumentTransaction()) { - if (auto entry = - session->checkStatementExecuted(opCtx, *opCtx->getTxnNumber(), stmtId)) { + if (auto entry = txnParticipant->checkStatementExecuted( + opCtx, *opCtx->getTxnNumber(), stmtId)) { containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); out.results.emplace_back(parseOplogEntryForUpdate(*entry)); @@ -821,9 +818,9 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who for (auto&& singleOp : wholeOp.getDeletes()) { const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++); if (opCtx->getTxnNumber()) { - auto session = OperationContextSession::get(opCtx); if (!txnParticipant->inMultiDocumentTransaction() && - session->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(), stmtId)) { + txnParticipant->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(), + stmtId)) { containsRetry = true; RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount(); out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry()); diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp index cacb87ade6a..801a9dcd31b 100644 --- a/src/mongo/db/repl/oplog.cpp +++ b/src/mongo/db/repl/oplog.cpp @@ -538,7 +538,9 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx, if (session) { sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(*opCtx->getTxnNumber()); - oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber()); + + const auto txnParticipant = TransactionParticipant::get(opCtx); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber()); } auto timestamps = stdx::make_unique<Timestamp[]>(count); @@ -1200,7 +1202,6 @@ Status applyOperation_inlock(OperationContext* opCtx, } return true; }(); - invariant(!assignOperationTimestamp || !fieldTs.eoo(), str::stream() << "Oplog entry did not have 'ts' field when expected: " << redact(op)); diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp index 4028d6980c0..0673d0dec2a 100644 --- a/src/mongo/db/repl/session_update_tracker.cpp +++ b/src/mongo/db/repl/session_update_tracker.cpp @@ -31,11 +31,77 @@ #include "mongo/db/repl/session_update_tracker.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/session.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/util/assert_util.h" namespace mongo { namespace repl { +namespace { + +/** + * Constructs a new oplog entry if the given entry has transaction state embedded within in. The new + * oplog entry will contain the operation needed to replicate the transaction table. + * + * Returns boost::none if the given oplog doesn't have any transaction state or does not support + * update to the transaction table. + */ +boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate( + const repl::OplogEntry& entry) { + auto sessionInfo = entry.getOperationSessionInfo(); + if (!sessionInfo.getTxnNumber()) { + return boost::none; + } + + invariant(sessionInfo.getSessionId()); + invariant(entry.getWallClockTime()); + + const auto updateBSON = [&] { + SessionTxnRecord newTxnRecord; + newTxnRecord.setSessionId(*sessionInfo.getSessionId()); + newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); + newTxnRecord.setLastWriteOpTime(entry.getOpTime()); + newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); + + switch (entry.getCommandType()) { + case repl::OplogEntry::CommandType::kApplyOps: + newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared + : DurableTxnStateEnum::kCommitted); + break; + case repl::OplogEntry::CommandType::kCommitTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kCommitted); + break; + case repl::OplogEntry::CommandType::kAbortTransaction: + newTxnRecord.setState(DurableTxnStateEnum::kAborted); + break; + default: + break; + } + return newTxnRecord.toBSON(); + }(); + + return repl::OplogEntry( + entry.getOpTime(), + 0, // hash + repl::OpTypeEnum::kUpdate, + NamespaceString::kSessionTransactionsTableNamespace, + boost::none, // uuid + false, // fromMigrate + repl::OplogEntry::kOplogVersion, + updateBSON, + BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()), + {}, // sessionInfo + true, // upsert + *entry.getWallClockTime(), + boost::none, // statementId + boost::none, // prevWriteOpTime + boost::none, // preImangeOpTime + boost::none // postImageOpTime + ); +} + +} // namespace boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush( const OplogEntry& entry) { @@ -96,7 +162,7 @@ std::vector<OplogEntry> SessionUpdateTracker::flushAll() { std::vector<OplogEntry> opList; for (auto&& entry : _sessionsToUpdate) { - auto newUpdate = Session::createMatchingTransactionTableUpdate(entry.second); + auto newUpdate = createMatchingTransactionTableUpdate(entry.second); invariant(newUpdate); opList.push_back(std::move(*newUpdate)); } @@ -117,7 +183,7 @@ std::vector<OplogEntry> SessionUpdateTracker::_flushForQueryPredicate( } std::vector<OplogEntry> opList; - auto updateOplog = Session::createMatchingTransactionTableUpdate(iter->second); + auto updateOplog = createMatchingTransactionTableUpdate(iter->second); invariant(updateOplog); opList.push_back(std::move(*updateOplog)); _sessionsToUpdate.erase(iter); diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp index 5025b48ff23..9a2f06d1039 100644 --- a/src/mongo/db/repl/sync_tail.cpp +++ b/src/mongo/db/repl/sync_tail.cpp @@ -73,6 +73,7 @@ #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/stats/timer_stats.h" #include "mongo/db/storage/recovery_unit.h" +#include "mongo/db/transaction_participant.h" #include "mongo/stdx/memory.h" #include "mongo/util/exit.h" #include "mongo/util/fail_point_service.h" @@ -82,11 +83,7 @@ #include "mongo/util/scopeguard.h" namespace mongo { - -using std::endl; - namespace repl { - namespace { MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion); @@ -1005,12 +1002,12 @@ BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const OplogEntry& oplog bool ok = missingObjReader.connect(*source); if (!ok) { warning() << "network problem detected while connecting to the " - << "sync source, attempt " << retryCount << " of " << retryMax << endl; + << "sync source, attempt " << retryCount << " of " << retryMax; continue; // try again } } catch (const NetworkException&) { warning() << "network problem detected while connecting to the " - << "sync source, attempt " << retryCount << " of " << retryMax << endl; + << "sync source, attempt " << retryCount << " of " << retryMax; continue; // try again } @@ -1037,10 +1034,10 @@ BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const OplogEntry& oplog } } catch (const NetworkException&) { warning() << "network problem detected while fetching a missing document from the " - << "sync source, attempt " << retryCount << " of " << retryMax << endl; + << "sync source, attempt " << retryCount << " of " << retryMax; continue; // try again } catch (DBException& e) { - error() << "assertion fetching missing object: " << redact(e) << endl; + error() << "assertion fetching missing object: " << redact(e); throw; } diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp index 32cb9afa7f2..91944c0ebe7 100644 --- a/src/mongo/db/repl/sync_tail_test.cpp +++ b/src/mongo/db/repl/sync_tail_test.cpp @@ -63,6 +63,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/stdx/mutex.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index ede0ae0f8b5..9d7307eca00 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -247,20 +247,20 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, invariant(oplogEntry.getWallClockTime()); auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); - scopedSession->refreshFromStorageIfNeeded(opCtx); - if (!scopedSession->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) { + auto const txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get()); + txnParticipant->refreshFromStorageIfNeeded(opCtx); + + if (!txnParticipant->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) { // Don't continue migrating the transaction history return lastResult; } - auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get()); - txnParticipant->checkForNewTxnNumber(); - BSONObj object(result.isPrePostImage ? oplogEntry.getObject() : BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1)); auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry); - oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum); + oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(result.txnNum); writeConflictRetry( opCtx, @@ -301,7 +301,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, // Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because the // next oplog will contain the real operation if (!result.isPrePostImage) { - scopedSession->onMigrateCompletedOnPrimary( + txnParticipant->onMigrateCompletedOnPrimary( opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime()); } diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 7ca1a4a6294..576290f1ca4 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -46,6 +46,7 @@ #include "mongo/db/session_catalog.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_history_iterator.h" +#include "mongo/db/transaction_participant.h" #include "mongo/executor/remote_command_request.h" #include "mongo/s/catalog/sharding_catalog_client_mock.h" #include "mongo/s/catalog/type_shard.h" @@ -172,7 +173,9 @@ public: const LogicalSessionId& sessionId, const TxnNumber& txnNum) { auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId); - scopedSession->beginOrContinueTxn(opCtx, txnNum); + const auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get()); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); return scopedSession; } @@ -201,7 +204,8 @@ public: Session* session, TxnNumber txnNumber, StmtId stmtId) { - auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session); + auto oplog = txnParticipant->checkStatementExecuted(opCtx, txnNumber, stmtId); ASSERT_TRUE(oplog); } @@ -210,7 +214,8 @@ public: TxnNumber txnNumber, StmtId stmtId, repl::OplogEntry& expectedOplog) { - auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session); + auto oplog = txnParticipant->checkStatementExecuted(opCtx, txnNumber, stmtId); ASSERT_TRUE(oplog); checkOplogWithNestedOplog(expectedOplog, *oplog); } @@ -355,7 +360,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) { finishSessionExpectSuccess(&sessionMigration); auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -416,7 +422,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, txnNum); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(txnNum)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(txnNum)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -467,7 +474,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -534,7 +542,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) { auto session = getSessionWithTxn(opCtx, sessionId1, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + const auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session.get()); + + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx)); ASSERT_FALSE(historyIter.hasNext()); @@ -544,8 +555,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession) { auto session = getSessionWithTxn(opCtx, sessionId2, 42); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(42)); + const auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(42)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -607,7 +620,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog) auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplog(oplog2, historyIter.next(opCtx)); @@ -656,8 +671,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); auto nextOplog = historyIter.next(opCtx); @@ -744,8 +760,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); auto nextOplog = historyIter.next(opCtx); @@ -835,8 +852,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); auto nextOplog = historyIter.next(opCtx); @@ -934,8 +952,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) { ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState()); auto session = getSessionWithTxn(opCtx, sessionId, 20); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(20)); ASSERT_TRUE(historyIter.hasNext()); auto oplog = historyIter.next(opCtx); ASSERT_BSONOBJ_EQ(BSON("_id" @@ -995,8 +1014,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt finishSessionExpectSuccess(&sessionMigration); auto session = getSessionWithTxn(opCtx, sessionId, 20); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(20)); ASSERT_TRUE(historyIter.hasNext()); auto oplog = historyIter.next(opCtx); ASSERT_BSONOBJ_EQ(BSON("_id" @@ -1174,8 +1194,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, finishSessionExpectSuccess(&sessionMigration); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx)); @@ -1477,8 +1498,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem finishSessionExpectSuccess(&sessionMigration); auto session = getSessionWithTxn(opCtx, sessionId, 19); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(19)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(19)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -1517,13 +1539,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory Date_t::now(), // wall clock time 23); // statement id - auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime - OpTypeEnum::kNoop, // op type - {}, // o - Session::kDeadEndSentinel, // o2 - sessionInfo, // session info - Date_t::now(), // wall clock time - kIncompleteHistoryStmtId); // statement id + auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime + OpTypeEnum::kNoop, // op type + {}, // o + TransactionParticipant::kDeadEndSentinel, // o2 + sessionInfo, // session info + Date_t::now(), // wall clock time + kIncompleteHistoryStmtId); // statement id auto oplog3 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime OpTypeEnum::kInsert, // op type @@ -1544,8 +1566,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory auto opCtx = operationContext(); auto session = getSessionWithTxn(opCtx, sessionId, 2); - TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); + TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2)); ASSERT_TRUE(historyIter.hasNext()); checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx)); @@ -1559,7 +1582,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1); checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3); - ASSERT_THROWS(session->checkStatementExecuted(opCtx, 2, 38), AssertionException); + ASSERT_THROWS(txnParticipant->checkStatementExecuted(opCtx, 2, 38), AssertionException); } } // namespace diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp index 6e1ffc21134..9835843a9aa 100644 --- a/src/mongo/db/s/session_catalog_migration_source.cpp +++ b/src/mongo/db/s/session_catalog_migration_source.cpp @@ -40,6 +40,7 @@ #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/transaction_history_iterator.h" +#include "mongo/db/transaction_participant.h" #include "mongo/db/write_concern.h" #include "mongo/platform/random.h" #include "mongo/stdx/memory.h" @@ -106,14 +107,14 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, * Creates a special "write history lost" sentinel oplog entry. */ repl::OplogEntry makeSentinelOplogEntry(OperationSessionInfo sessionInfo, Date_t wallClockTime) { - return makeOplogEntry({}, // optime - hashGenerator.nextInt64(), // hash - repl::OpTypeEnum::kNoop, // op type - {}, // o - Session::kDeadEndSentinel, // o2 - sessionInfo, // session info - wallClockTime, // wall clock time - kIncompleteHistoryStmtId); // statement id + return makeOplogEntry({}, // optime + hashGenerator.nextInt64(), // hash + repl::OpTypeEnum::kNoop, // op type + {}, // o + TransactionParticipant::kDeadEndSentinel, // o2 + sessionInfo, // session info + wallClockTime, // wall clock time + kIncompleteHistoryStmtId); // statement id } } // namespace diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp index dc2ae5677de..b8a99afb851 100644 --- a/src/mongo/db/s/session_catalog_migration_source_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp @@ -35,6 +35,7 @@ #include "mongo/db/s/session_catalog_migration_source.h" #include "mongo/db/session.h" #include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/transaction_participant.h" #include "mongo/executor/remote_command_request.h" #include "mongo/unittest/unittest.h" @@ -593,7 +594,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis auto oplog = *nextOplogResult.oplog; ASSERT_TRUE(oplog.getObject2()); - ASSERT_BSONOBJ_EQ(Session::kDeadEndSentinel, *oplog.getObject2()); + ASSERT_BSONOBJ_EQ(TransactionParticipant::kDeadEndSentinel, *oplog.getObject2()); ASSERT_TRUE(oplog.getStatementId()); ASSERT_EQ(kIncompleteHistoryStmtId, *oplog.getStatementId()); ASSERT_TRUE(oplog.getWallClockTime()); diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 8182d161303..085de43e94b 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -26,683 +26,14 @@ * it in the license file. */ -#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage -#define LOG_FOR_TRANSACTION(level) \ - MONGO_LOG_COMPONENT(level, ::mongo::logger::LogComponent::kTransaction) #include "mongo/platform/basic.h" #include "mongo/db/session.h" -#include "mongo/db/catalog/index_catalog.h" -#include "mongo/db/concurrency/lock_state.h" -#include "mongo/db/concurrency/locker.h" -#include "mongo/db/concurrency/write_conflict_exception.h" -#include "mongo/db/db_raii.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/index/index_access_method.h" -#include "mongo/db/namespace_string.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/ops/update.h" -#include "mongo/db/query/get_executor.h" -#include "mongo/db/retryable_writes_stats.h" -#include "mongo/db/transaction_history_iterator.h" -#include "mongo/stdx/memory.h" -#include "mongo/transport/transport_layer.h" -#include "mongo/util/fail_point_service.h" -#include "mongo/util/log.h" -#include "mongo/util/mongoutils/str.h" - namespace mongo { -namespace { - -void fassertOnRepeatedExecution(const LogicalSessionId& lsid, - TxnNumber txnNumber, - StmtId stmtId, - const repl::OpTime& firstOpTime, - const repl::OpTime& secondOpTime) { - severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":" - << txnNumber << " ] was committed once with opTime " << firstOpTime - << " and a second time with opTime " << secondOpTime - << ". This indicates possible data corruption or server bug and the process will be " - "terminated."; - fassertFailed(40526); -} - -struct ActiveTransactionHistory { - boost::optional<SessionTxnRecord> lastTxnRecord; - Session::CommittedStatementTimestampMap committedStatements; - bool transactionCommitted{false}; - bool hasIncompleteHistory{false}; -}; - -ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, - const LogicalSessionId& lsid) { - ActiveTransactionHistory result; - - result.lastTxnRecord = [&]() -> boost::optional<SessionTxnRecord> { - DBDirectClient client(opCtx); - auto result = - client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), - {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())}); - if (result.isEmpty()) { - return boost::none; - } - - return SessionTxnRecord::parse(IDLParserErrorContext("parse latest txn record for session"), - result); - }(); - - if (!result.lastTxnRecord) { - return result; - } - - auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime()); - while (it.hasNext()) { - try { - const auto entry = it.next(opCtx); - invariant(entry.getStatementId()); - - if (*entry.getStatementId() == kIncompleteHistoryStmtId) { - // Only the dead end sentinel can have this id for oplog write history - invariant(entry.getObject2()); - invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0); - result.hasIncompleteHistory = true; - continue; - } - - const auto insertRes = - result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); - if (!insertRes.second) { - const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(lsid, - result.lastTxnRecord->getTxnNum(), - *entry.getStatementId(), - existingOpTime, - entry.getOpTime()); - } - - // applyOps oplog entry marks the commit of a transaction. - if (entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) { - result.transactionCommitted = true; - } - } catch (const DBException& ex) { - if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { - result.hasIncompleteHistory = true; - break; - } - - throw; - } - } - - return result; -} - -void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) { - // Current code only supports replacement update. - dassert(UpdateDriver::isDocReplacement(updateRequest.getUpdates())); - - AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX); - - uassert(40527, - str::stream() << "Unable to persist transaction state because the session transaction " - "collection is missing. This indicates that the " - << NamespaceString::kSessionTransactionsTableNamespace.ns() - << " collection has been manually deleted.", - autoColl.getCollection()); - - WriteUnitOfWork wuow(opCtx); - - auto collection = autoColl.getCollection(); - auto idIndex = collection->getIndexCatalog()->findIdIndex(opCtx); - - uassert(40672, - str::stream() << "Failed to fetch _id index for " - << NamespaceString::kSessionTransactionsTableNamespace.ns(), - idIndex); - - auto indexAccess = collection->getIndexCatalog()->getIndex(idIndex); - // Since we are looking up a key inside the _id index, create a key object consisting of only - // the _id field. - auto idToFetch = updateRequest.getQuery().firstElement(); - auto toUpdateIdDoc = idToFetch.wrap(); - dassert(idToFetch.fieldNameStringData() == "_id"_sd); - auto recordId = indexAccess->findSingle(opCtx, toUpdateIdDoc); - auto startingSnapshotId = opCtx->recoveryUnit()->getSnapshotId(); - - if (recordId.isNull()) { - // Upsert case. - auto status = collection->insertDocument( - opCtx, InsertStatement(updateRequest.getUpdates()), nullptr, false); - - if (status == ErrorCodes::DuplicateKey) { - throw WriteConflictException(); - } - - uassertStatusOK(status); - wuow.commit(); - return; - } - - auto originalRecordData = collection->getRecordStore()->dataFor(opCtx, recordId); - auto originalDoc = originalRecordData.toBson(); - - invariant(collection->getDefaultCollator() == nullptr); - boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContext(opCtx, nullptr)); - - auto matcher = - fassert(40673, MatchExpressionParser::parse(updateRequest.getQuery(), std::move(expCtx))); - if (!matcher->matchesBSON(originalDoc)) { - // Document no longer match what we expect so throw WCE to make the caller re-examine. - throw WriteConflictException(); - } - - CollectionUpdateArgs args; - args.update = updateRequest.getUpdates(); - args.criteria = toUpdateIdDoc; - args.fromMigrate = false; - - collection->updateDocument(opCtx, - recordId, - Snapshotted<BSONObj>(startingSnapshotId, originalDoc), - updateRequest.getUpdates(), - false, // indexesAffected = false because _id is the only index - nullptr, - &args); - - wuow.commit(); -} - -// Failpoint which allows different failure actions to happen after each write. Supports the -// parameters below, which can be combined with each other (unless explicitly disallowed): -// -// closeConnection (bool, default = true): Closes the connection on which the write was executed. -// failBeforeCommitExceptionCode (int, default = not specified): If set, the specified exception -// code will be thrown, which will cause the write to not commit; if not specified, the write -// will be allowed to commit. -MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite); - -} // namespace - -const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); - Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} -void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) { - if (opCtx->getClient()->isInDirectClient()) { - return; - } - - invariant(!opCtx->lockState()->isLocked()); - invariant(repl::ReadConcernArgs::get(opCtx).getLevel() == - repl::ReadConcernLevel::kLocalReadConcern); - - stdx::unique_lock<stdx::mutex> ul(_mutex); - - while (!_isValid) { - const int numInvalidations = _numInvalidations; - - ul.unlock(); - - auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId); - - ul.lock(); - - // Protect against concurrent refreshes or invalidations - if (!_isValid && _numInvalidations == numInvalidations) { - _isValid = true; - _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord); - - if (_lastWrittenSessionRecord) { - if (!_lastRefreshState) { - _lastRefreshState.emplace(); - } - - _lastRefreshState->refreshCount++; - - _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum(); - _lastRefreshState->txnNumber = _activeTxnNumber; - - _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements); - _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory; - _lastRefreshState->isCommitted = activeTxnHistory.transactionCommitted; - } - - break; - } - } -} - -void Session::beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber) { - stdx::lock_guard<stdx::mutex> lg(_mutex); - _beginOrContinueTxn(lg, txnNumber); -} - -void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx, - TxnNumber txnNumber, - std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime, - Date_t lastStmtIdWriteDate, - boost::optional<DurableTxnStateEnum> txnState) { - invariant(opCtx->lockState()->inAWriteUnitOfWork()); - - stdx::unique_lock<stdx::mutex> ul(_mutex); - - // Sanity check that we don't double-execute statements - for (const auto stmtId : stmtIdsWritten) { - const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId); - if (stmtOpTime) { - fassertOnRepeatedExecution( - _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); - } - } - - const auto updateRequest = - _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState); - - ul.unlock(); - - repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); - - updateSessionEntry(opCtx, updateRequest); - _registerUpdateCacheOnCommit( - opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); -} - -bool Session::onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) { - beginOrContinueTxn(opCtx, txnNumber); - - try { - if (checkStatementExecuted(opCtx, txnNumber, stmtId)) { - return false; - } - } catch (const DBException& ex) { - // If the transaction chain was truncated on the recipient shard, then we - // are most likely copying from a session that hasn't been touched on the - // recipient shard for a very long time but could be recent on the donor. - // We continue copying regardless to get the entire transaction from the donor. - if (ex.code() != ErrorCodes::IncompleteTransactionHistory) { - throw; - } - if (stmtId == kIncompleteHistoryStmtId) { - return false; - } - } - - return true; -} - -void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx, - TxnNumber txnNumber, - std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime, - Date_t oplogLastStmtIdWriteDate) { - invariant(opCtx->lockState()->inAWriteUnitOfWork()); - - stdx::unique_lock<stdx::mutex> ul(_mutex); - - _checkValid(ul); - _checkIsActiveTransaction(ul, txnNumber); - - // We do not migrate transaction oplog entries. - auto txnState = boost::none; - const auto updateRequest = _makeUpdateRequest( - ul, txnNumber, lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState); - - ul.unlock(); - - repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); - - updateSessionEntry(opCtx, updateRequest); - _registerUpdateCacheOnCommit( - opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); -} - -void Session::invalidate() { - stdx::lock_guard<stdx::mutex> lg(_mutex); - - if (_isTxnNumberLocked) { - invariant(_txnNumberLockConflictStatus); - uasserted(50908, - str::stream() << "cannot invalidate session because txnNumber is locked: " - << *_txnNumberLockConflictStatus); - } - - _isValid = false; - _numInvalidations++; - - _lastWrittenSessionRecord.reset(); - - _activeTxnNumber = kUninitializedTxnNumber; - _activeTxnCommittedStatements.clear(); - _hasIncompleteHistory = false; -} - -repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const { - stdx::lock_guard<stdx::mutex> lg(_mutex); - _checkValid(lg); - _checkIsActiveTransaction(lg, txnNumber); - - if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) - return {}; - - return _lastWrittenSessionRecord->getLastWriteOpTime(); -} - -boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationContext* opCtx, - TxnNumber txnNumber, - StmtId stmtId) const { - const auto stmtTimestamp = [&] { - stdx::lock_guard<stdx::mutex> lg(_mutex); - return _checkStatementExecuted(lg, txnNumber, stmtId); - }(); - - if (!stmtTimestamp) - return boost::none; - - TransactionHistoryIterator txnIter(*stmtTimestamp); - while (txnIter.hasNext()) { - const auto entry = txnIter.next(opCtx); - invariant(entry.getStatementId()); - if (*entry.getStatementId() == stmtId) - return entry; - } - - MONGO_UNREACHABLE; -} - -bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const { - stdx::lock_guard<stdx::mutex> lg(_mutex); - return bool(_checkStatementExecuted(lg, txnNumber, stmtId)); -} - -void Session::_beginOrContinueTxn(WithLock wl, TxnNumber txnNumber) { - - // Check whether the session information needs to be refreshed from disk. - _checkValid(wl); - - // Check if the given transaction number is valid for this session. The transaction number must - // be >= the active transaction number. - _checkTxnValid(wl, txnNumber); - - // - // Continue an active transaction. - // - if (txnNumber == _activeTxnNumber) { - return; - } - - invariant(txnNumber > _activeTxnNumber); - _setActiveTxn(wl, txnNumber); - - LOG_FOR_TRANSACTION(4) << "New transaction started with txnNumber: " << txnNumber - << " on session with lsid " << getSessionId().getId(); -} - -void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const { - uassert(ErrorCodes::TransactionTooOld, - str::stream() << "Cannot start transaction " << txnNumber << " on session " - << getSessionId() - << " because a newer transaction " - << _activeTxnNumber - << " has already started.", - txnNumber >= _activeTxnNumber); -} - -void Session::_setActiveTxn(WithLock wl, TxnNumber txnNumber) { - if (_isTxnNumberLocked) { - invariant(_txnNumberLockConflictStatus); - uassertStatusOK(*_txnNumberLockConflictStatus); - } - - _activeTxnNumber = txnNumber; - _activeTxnCommittedStatements.clear(); - _hasIncompleteHistory = false; -} - -void Session::_checkValid(WithLock) const { - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Session " << getSessionId() - << " was concurrently modified and the operation must be retried.", - _isValid); -} - -void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const { - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Cannot perform operations on transaction " << txnNumber - << " on session " - << getSessionId() - << " because a different transaction " - << _activeTxnNumber - << " is now active.", - txnNumber == _activeTxnNumber); -} - -boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl, - TxnNumber txnNumber, - StmtId stmtId) const { - _checkValid(wl); - _checkIsActiveTransaction(wl, txnNumber); - - const auto it = _activeTxnCommittedStatements.find(stmtId); - if (it == _activeTxnCommittedStatements.end()) { - uassert(ErrorCodes::IncompleteTransactionHistory, - str::stream() << "Incomplete history detected for transaction " << txnNumber - << " on session " - << _sessionId.toBSON(), - !_hasIncompleteHistory); - - return boost::none; - } - - invariant(_lastWrittenSessionRecord); - invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber); - - return it->second; -} - -Date_t Session::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const { - _checkValid(wl); - _checkIsActiveTransaction(wl, txnNumber); - - if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) - return {}; - - return _lastWrittenSessionRecord->getLastWriteDate(); -} - -UpdateRequest Session::_makeUpdateRequest(WithLock, - TxnNumber newTxnNumber, - const repl::OpTime& newLastWriteOpTime, - Date_t newLastWriteDate, - boost::optional<DurableTxnStateEnum> newState) const { - UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); - - const auto updateBSON = [&] { - SessionTxnRecord newTxnRecord; - newTxnRecord.setSessionId(_sessionId); - newTxnRecord.setTxnNum(newTxnNumber); - newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); - newTxnRecord.setLastWriteDate(newLastWriteDate); - newTxnRecord.setState(newState); - return newTxnRecord.toBSON(); - }(); - updateRequest.setUpdates(updateBSON); - updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId.toBSON())); - updateRequest.setUpsert(true); - - return updateRequest; -} - -void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx, - TxnNumber newTxnNumber, - std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime) { - opCtx->recoveryUnit()->onCommit( - [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ]( - boost::optional<Timestamp>) { - RetryableWritesStats::get(getGlobalServiceContext()) - ->incrementTransactionsCollectionWriteCount(); - - stdx::lock_guard<stdx::mutex> lg(_mutex); - - if (!_isValid) - return; - - // The cache of the last written record must always be advanced after a write so that - // subsequent writes have the correct point to start from. - if (!_lastWrittenSessionRecord) { - _lastWrittenSessionRecord.emplace(); - - _lastWrittenSessionRecord->setSessionId(_sessionId); - _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } else { - if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) - _lastWrittenSessionRecord->setTxnNum(newTxnNumber); - - if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) - _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); - } - - if (newTxnNumber > _activeTxnNumber) { - // This call is necessary in order to advance the txn number and reset the cached - // state in the case where just before the storage transaction commits, the cache - // entry gets invalidated and immediately refreshed while there were no writes for - // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber - // and we will fail to update the cache even though the write was successful. - _beginOrContinueTxn(lg, newTxnNumber); - } - - if (newTxnNumber == _activeTxnNumber) { - for (const auto stmtId : stmtIdsWritten) { - if (stmtId == kIncompleteHistoryStmtId) { - _hasIncompleteHistory = true; - continue; - } - - const auto insertRes = - _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); - if (!insertRes.second) { - const auto& existingOpTime = insertRes.first->second; - fassertOnRepeatedExecution(_sessionId, - newTxnNumber, - stmtId, - existingOpTime, - lastStmtIdWriteOpTime); - } - } - } - }); - - MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) { - const auto& data = customArgs.getData(); - - const auto closeConnectionElem = data["closeConnection"]; - if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) { - opCtx->getClient()->session()->end(); - } - - const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"]; - if (!failBeforeCommitExceptionElem.eoo()) { - const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number())); - uasserted(failureCode, - str::stream() << "Failing write for " << _sessionId << ":" << newTxnNumber - << " due to failpoint. The write must not be reflected."); - } - } -} - -boost::optional<repl::OplogEntry> Session::createMatchingTransactionTableUpdate( - const repl::OplogEntry& entry) { - auto sessionInfo = entry.getOperationSessionInfo(); - if (!sessionInfo.getTxnNumber()) { - return boost::none; - } - - invariant(sessionInfo.getSessionId()); - invariant(entry.getWallClockTime()); - - const auto updateBSON = [&] { - SessionTxnRecord newTxnRecord; - newTxnRecord.setSessionId(*sessionInfo.getSessionId()); - newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber()); - newTxnRecord.setLastWriteOpTime(entry.getOpTime()); - newTxnRecord.setLastWriteDate(*entry.getWallClockTime()); - - switch (entry.getCommandType()) { - case repl::OplogEntry::CommandType::kApplyOps: - newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared - : DurableTxnStateEnum::kCommitted); - break; - case repl::OplogEntry::CommandType::kCommitTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kCommitted); - break; - case repl::OplogEntry::CommandType::kAbortTransaction: - newTxnRecord.setState(DurableTxnStateEnum::kAborted); - break; - default: - break; - } - return newTxnRecord.toBSON(); - }(); - - return repl::OplogEntry( - entry.getOpTime(), - 0, // hash - repl::OpTypeEnum::kUpdate, - NamespaceString::kSessionTransactionsTableNamespace, - boost::none, // uuid - false, // fromMigrate - repl::OplogEntry::kOplogVersion, - updateBSON, - BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()), - {}, // sessionInfo - true, // upsert - *entry.getWallClockTime(), - boost::none, // statementId - boost::none, // prevWriteOpTime - boost::none, // preImangeOpTime - boost::none // postImageOpTime - ); -} - -boost::optional<Session::RefreshState> Session::getLastRefreshState() const { - stdx::lock_guard<stdx::mutex> lg(_mutex); - return _lastRefreshState; -} - -void Session::lockTxnNumber(const TxnNumber lockThisNumber, Status conflictError) { - stdx::lock_guard<stdx::mutex> lg(_mutex); - uassert(50907, - str::stream() << "cannot lock txnNumber to " << lockThisNumber - << " because current txnNumber is " - << _activeTxnNumber, - _activeTxnNumber == lockThisNumber); - // TODO: remove this if we need to support recursive locking. - invariant(!_isTxnNumberLocked); - - _isTxnNumberLocked = true; - _txnNumberLockConflictStatus = conflictError; -} - -void Session::unlockTxnNumber() { - stdx::lock_guard<stdx::mutex> lg(_mutex); - - _isTxnNumberLocked = false; - _txnNumberLockConflictStatus = boost::none; -} - -bool Session::isLockedTxnNumber(const TxnNumber expectedLockedNumber) const { - stdx::lock_guard<stdx::mutex> lg(_mutex); - invariant(_activeTxnNumber == expectedLockedNumber, - str::stream() << "Expected TxnNumber: " << expectedLockedNumber - << ", Active TxnNumber: " - << _activeTxnNumber); - return _isTxnNumberLocked; -} - void Session::setCurrentOperation(OperationContext* currentOperation) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(!_currentOperation); diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 2f79b3784f8..7b0c1a86857 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -28,196 +28,32 @@ #pragma once -#include <boost/optional.hpp> - #include "mongo/base/disallow_copying.h" -#include "mongo/bson/timestamp.h" #include "mongo/db/logical_session_id.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/repl/oplog.h" -#include "mongo/db/repl/oplog_entry.h" -#include "mongo/db/session_txn_record_gen.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/unordered_map.h" -#include "mongo/util/concurrency/with_lock.h" +#include "mongo/util/decorable.h" namespace mongo { class OperationContext; -class UpdateRequest; /** - * A write through cache for the state of a particular session. All modifications to the underlying - * session transactions collection must be performed through an object of this class. - * - * The cache state can be 'up-to-date' (it is in sync with the persistent contents) or 'needs - * refresh' (in which case refreshFromStorageIfNeeded needs to be called in order to make it - * up-to-date). + * A decorable container for state associated with an active session running on a MongoD or MongoS + * server. Refer to SessionCatalog for more information on the semantics of sessions. */ class Session : public Decorable<Session> { MONGO_DISALLOW_COPYING(Session); public: - using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>; - - static const BSONObj kDeadEndSentinel; - explicit Session(LogicalSessionId sessionId); - const LogicalSessionId& getSessionId() const { - return _sessionId; - } - - struct RefreshState { - long long refreshCount{0}; - TxnNumber txnNumber{kUninitializedTxnNumber}; - bool isCommitted{false}; - }; - - /** - * Blocking method, which loads the transaction state from storage if it has been marked as - * needing refresh. - * - * In order to avoid the possibility of deadlock, this method must not be called while holding a - * lock. - */ - void refreshFromStorageIfNeeded(OperationContext* opCtx); - - /** - * Starts a new transaction on the session, or continues an already active transaction. In this - * context, a "transaction" is a sequence of operations associated with a transaction number. - * - * Throws an exception if: - * - An attempt is made to start a transaction with number less than the latest - * transaction this session has seen. - * - The session has been invalidated. - * - * In order to avoid the possibility of deadlock, this method must not be called while holding a - * lock. This method must also be called after refreshFromStorageIfNeeded has been called. - */ - void beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber); - - /** - * Called after a write under the specified transaction completes while the node is a primary - * and specifies the statement ids which were written. Must be called while the caller is still - * in the write's WUOW. Updates the on-disk state of the session to match the specified - * transaction/opTime and keeps the cached state in sync. - * - * 'txnState' is 'none' for retryable writes. - * - * Must only be called with the session checked-out. - * - * Throws if the session has been invalidated or the active transaction number doesn't match. - */ - void onWriteOpCompletedOnPrimary(OperationContext* opCtx, - TxnNumber txnNumber, - std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime, - Date_t lastStmtIdWriteDate, - boost::optional<DurableTxnStateEnum> txnState); - /** - * Helper function to begin a migration on a primary node. - * - * Returns whether the specified statement should be migrated at all or skipped. - * - * Not called with session checked out. + * The logical session id that this object represents. */ - bool onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId); - - /** - * Called after an entry for the specified session and transaction has been written to the oplog - * during chunk migration, while the node is still primary. Must be called while the caller is - * still in the oplog write's WUOW. Updates the on-disk state of the session to match the - * specified transaction/opTime and keeps the cached state in sync. - * - * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary - * and doesn't require the session to be checked-out. - * - * Throws if the session has been invalidated or the active transaction number is newer than the - * one specified. - */ - void onMigrateCompletedOnPrimary(OperationContext* opCtx, - TxnNumber txnNumber, - std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteOpTime, - Date_t oplogLastStmtIdWriteDate); - - /** - * Marks the session as requiring refresh. Used when the session state has been modified - * externally, such as through a direct write to the transactions table. - */ - void invalidate(); - - /** - * Returns the op time of the last committed write for this session and transaction. If no write - * has completed yet, returns an empty timestamp. - * - * Throws if the session has been invalidated or the active transaction number doesn't match. - */ - repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const; - - /** - * Checks whether the given statementId for the specified transaction has already executed and - * if so, returns the oplog entry which was generated by that write. If the statementId hasn't - * executed, returns boost::none. - * - * Must only be called with the session checked-out. - * - * Throws if the session has been invalidated or the active transaction number doesn't match. - */ - boost::optional<repl::OplogEntry> checkStatementExecuted(OperationContext* opCtx, - TxnNumber txnNumber, - StmtId stmtId) const; - - /** - * Checks whether the given statementId for the specified transaction has already executed - * without fetching the oplog entry which was generated by that write. - * - * Must only be called with the session checked-out. - * - * Throws if the session has been invalidated or the active transaction number doesn't match. - */ - bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const; - - TxnNumber getActiveTxnNumber() const { - stdx::lock_guard<stdx::mutex> lk(_mutex); - return _activeTxnNumber; + const LogicalSessionId& getSessionId() const { + return _sessionId; } /** - * Returns a new oplog entry if the given entry has transaction state embedded within in. - * The new oplog entry will contain the operation needed to replicate the transaction - * table. - * Returns boost::none if the given oplog doesn't have any transaction state or does not - * support update to the transaction table. - */ - static boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate( - const repl::OplogEntry& entry); - - /** - * Returns the state of the session from storage the last time a refresh occurred. - */ - boost::optional<RefreshState> getLastRefreshState() const; - - /** - * Attempt to lock the active TxnNumber of this session to the given number. This operation - * can only succeed if it is equal to the current active TxnNumber. Also sets the error status - * for any callers trying to modify the TxnNumber. - */ - void lockTxnNumber(const TxnNumber lockThisNumber, Status conflictError); - - /** - * Release the lock on the active TxnNumber and allow it to be modified. - */ - void unlockTxnNumber(); - - /** - * Returns if the given TxnNumber is locked. - */ - bool isLockedTxnNumber(const TxnNumber expectedLockedNumber) const; - - /** * Sets the current operation running on this Session. */ void setCurrentOperation(OperationContext* currentOperation); @@ -234,40 +70,7 @@ public: OperationContext* getCurrentOperation() const; private: - void _beginOrContinueTxn(WithLock, TxnNumber txnNumber); - - // Checks if there is a conflicting operation on the current Session - void _checkValid(WithLock) const; - - // Checks that a new txnNumber is higher than the activeTxnNumber so - // we don't start a txn that is too old. - void _checkTxnValid(WithLock, TxnNumber txnNumber) const; - - void _setActiveTxn(WithLock, TxnNumber txnNumber); - - void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const; - - boost::optional<repl::OpTime> _checkStatementExecuted(WithLock, - TxnNumber txnNumber, - StmtId stmtId) const; - - // Returns the write date of the last committed write for this session and transaction. If no - // write has completed yet, returns an empty date. - // - // Throws if the session has been invalidated or the active transaction number doesn't match. - Date_t _getLastWriteDate(WithLock, TxnNumber txnNumber) const; - - UpdateRequest _makeUpdateRequest(WithLock, - TxnNumber newTxnNumber, - const repl::OpTime& newLastWriteTs, - Date_t newLastWriteDate, - boost::optional<DurableTxnStateEnum> newState) const; - - void _registerUpdateCacheOnCommit(OperationContext* opCtx, - TxnNumber newTxnNumber, - std::vector<StmtId> stmtIdsWritten, - const repl::OpTime& lastStmtIdWriteTs); - + // The id of the session with which this object is associated const LogicalSessionId _sessionId; // Protects the member variables below. @@ -276,40 +79,6 @@ private: // A pointer back to the currently running operation on this Session, or nullptr if there // is no operation currently running for the Session. OperationContext* _currentOperation{nullptr}; - - // Specifies whether the session information needs to be refreshed from storage - bool _isValid{false}; - - // Counter, incremented with each call to invalidate in order to discern invalidations, which - // happen during refresh - int _numInvalidations{0}; - - // Set to true if incomplete history is detected. For example, when the oplog to a write was - // truncated because it was too old. - bool _hasIncompleteHistory{false}; - - // Caches what is known to be the last written transaction record for the session - boost::optional<SessionTxnRecord> _lastWrittenSessionRecord; - - // Tracks the last seen txn number for the session and is always >= to the transaction number in - // the last written txn record. When it is > than that in the last written txn record, this - // means a new transaction has begun on the session, but it hasn't yet performed any writes. - TxnNumber _activeTxnNumber{kUninitializedTxnNumber}; - - // For the active txn, tracks which statement ids have been committed and at which oplog - // opTime. Used for fast retryability check and retrieving the previous write's data without - // having to scan through the oplog. - CommittedStatementTimestampMap _activeTxnCommittedStatements; - - // Stores the state from last refresh. - boost::optional<RefreshState> _lastRefreshState; - - // True if txnNumber cannot be modified. - bool _isTxnNumberLocked{false}; - - // The status to return when an operation tries to modify the active TxnNumber while it is - // locked. - boost::optional<Status> _txnNumberLockConflictStatus; }; } // namespace mongo diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index d4d4634f36e..5d18e890666 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -173,7 +173,9 @@ void SessionCatalog::invalidateSessions(OperationContext* opCtx, const auto invalidateSessionFn = [&](WithLock, SessionRuntimeInfoMap::iterator it) { auto& sri = it->second; - sri->txnState.invalidate(); + auto const txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(&sri->txnState); + txnParticipant->invalidate(); // We cannot remove checked-out sessions from the cache, because operations expect to find // them there to check back in diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index 97bad1e279e..365202ba309 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -35,6 +35,7 @@ #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" +#include "mongo/util/concurrency/with_lock.h" namespace mongo { diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index f2fe8df56a0..b3e1d73c17b 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -28,22 +28,34 @@ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage +#define LOG_FOR_TRANSACTION(level) \ + MONGO_LOG_COMPONENT(level, ::mongo::logger::LogComponent::kTransaction) + #include "mongo/platform/basic.h" #include "mongo/db/transaction_participant.h" +#include "mongo/db/catalog/index_catalog.h" #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/locker.h" +#include "mongo/db/concurrency/write_conflict_exception.h" #include "mongo/db/curop_failpoint_helpers.h" +#include "mongo/db/db_raii.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/index/index_access_method.h" #include "mongo/db/op_observer.h" +#include "mongo/db/ops/update.h" +#include "mongo/db/query/get_executor.h" #include "mongo/db/repl/repl_client_info.h" +#include "mongo/db/retryable_writes_stats.h" #include "mongo/db/server_parameters.h" #include "mongo/db/server_transactions_metrics.h" #include "mongo/db/session.h" #include "mongo/db/session_catalog.h" #include "mongo/db/stats/fill_locker_info.h" +#include "mongo/db/transaction_history_iterator.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_utils.h" @@ -85,7 +97,184 @@ const auto getTransactionParticipant = Session::declareDecoration<TransactionPar const StringMap<int> preparedTxnCmdWhitelist = { {"abortTransaction", 1}, {"commitTransaction", 1}, {"prepareTransaction", 1}}; -} // unnamed namespace +void fassertOnRepeatedExecution(const LogicalSessionId& lsid, + TxnNumber txnNumber, + StmtId stmtId, + const repl::OpTime& firstOpTime, + const repl::OpTime& secondOpTime) { + severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":" + << txnNumber << " ] was committed once with opTime " << firstOpTime + << " and a second time with opTime " << secondOpTime + << ". This indicates possible data corruption or server bug and the process will be " + "terminated."; + fassertFailed(40526); +} + +struct ActiveTransactionHistory { + boost::optional<SessionTxnRecord> lastTxnRecord; + TransactionParticipant::CommittedStatementTimestampMap committedStatements; + bool transactionCommitted{false}; + bool hasIncompleteHistory{false}; +}; + +ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx, + const LogicalSessionId& lsid) { + // Since we are using DBDirectClient to read the transactions table and the oplog, we should + // never be reading from a snapshot, but directly from what is the latest on disk. This + // invariant guards against programming errors where the default read concern on the + // OperationContext could have been changed to something other than 'local'. + invariant(repl::ReadConcernArgs::get(opCtx).getLevel() == + repl::ReadConcernLevel::kLocalReadConcern); + + ActiveTransactionHistory result; + + result.lastTxnRecord = [&]() -> boost::optional<SessionTxnRecord> { + DBDirectClient client(opCtx); + auto result = + client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())}); + if (result.isEmpty()) { + return boost::none; + } + + return SessionTxnRecord::parse(IDLParserErrorContext("parse latest txn record for session"), + result); + }(); + + if (!result.lastTxnRecord) { + return result; + } + + auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime()); + while (it.hasNext()) { + try { + const auto entry = it.next(opCtx); + invariant(entry.getStatementId()); + + if (*entry.getStatementId() == kIncompleteHistoryStmtId) { + // Only the dead end sentinel can have this id for oplog write history + invariant(entry.getObject2()); + invariant(entry.getObject2()->woCompare(TransactionParticipant::kDeadEndSentinel) == + 0); + result.hasIncompleteHistory = true; + continue; + } + + const auto insertRes = + result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime()); + if (!insertRes.second) { + const auto& existingOpTime = insertRes.first->second; + fassertOnRepeatedExecution(lsid, + result.lastTxnRecord->getTxnNum(), + *entry.getStatementId(), + existingOpTime, + entry.getOpTime()); + } + + // applyOps oplog entry marks the commit of a transaction. + if (entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) { + result.transactionCommitted = true; + } + } catch (const DBException& ex) { + if (ex.code() == ErrorCodes::IncompleteTransactionHistory) { + result.hasIncompleteHistory = true; + break; + } + + throw; + } + } + + return result; +} + +void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) { + // Current code only supports replacement update. + dassert(UpdateDriver::isDocReplacement(updateRequest.getUpdates())); + + AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX); + + uassert(40527, + str::stream() << "Unable to persist transaction state because the session transaction " + "collection is missing. This indicates that the " + << NamespaceString::kSessionTransactionsTableNamespace.ns() + << " collection has been manually deleted.", + autoColl.getCollection()); + + WriteUnitOfWork wuow(opCtx); + + auto collection = autoColl.getCollection(); + auto idIndex = collection->getIndexCatalog()->findIdIndex(opCtx); + + uassert(40672, + str::stream() << "Failed to fetch _id index for " + << NamespaceString::kSessionTransactionsTableNamespace.ns(), + idIndex); + + auto indexAccess = collection->getIndexCatalog()->getIndex(idIndex); + // Since we are looking up a key inside the _id index, create a key object consisting of only + // the _id field. + auto idToFetch = updateRequest.getQuery().firstElement(); + auto toUpdateIdDoc = idToFetch.wrap(); + dassert(idToFetch.fieldNameStringData() == "_id"_sd); + auto recordId = indexAccess->findSingle(opCtx, toUpdateIdDoc); + auto startingSnapshotId = opCtx->recoveryUnit()->getSnapshotId(); + + if (recordId.isNull()) { + // Upsert case. + auto status = collection->insertDocument( + opCtx, InsertStatement(updateRequest.getUpdates()), nullptr, false); + + if (status == ErrorCodes::DuplicateKey) { + throw WriteConflictException(); + } + + uassertStatusOK(status); + wuow.commit(); + return; + } + + auto originalRecordData = collection->getRecordStore()->dataFor(opCtx, recordId); + auto originalDoc = originalRecordData.toBson(); + + invariant(collection->getDefaultCollator() == nullptr); + boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContext(opCtx, nullptr)); + + auto matcher = + fassert(40673, MatchExpressionParser::parse(updateRequest.getQuery(), std::move(expCtx))); + if (!matcher->matchesBSON(originalDoc)) { + // Document no longer match what we expect so throw WCE to make the caller re-examine. + throw WriteConflictException(); + } + + CollectionUpdateArgs args; + args.update = updateRequest.getUpdates(); + args.criteria = toUpdateIdDoc; + args.fromMigrate = false; + + collection->updateDocument(opCtx, + recordId, + Snapshotted<BSONObj>(startingSnapshotId, originalDoc), + updateRequest.getUpdates(), + false, // indexesAffected = false because _id is the only index + nullptr, + &args); + + wuow.commit(); +} + +// Failpoint which allows different failure actions to happen after each write. Supports the +// parameters below, which can be combined with each other (unless explicitly disallowed): +// +// closeConnection (bool, default = true): Closes the connection on which the write was executed. +// failBeforeCommitExceptionCode (int, default = not specified): If set, the specified exception +// code will be thrown, which will cause the write to not commit; if not specified, the write +// will be allowed to commit. +MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite); + +} // namespace + +const BSONObj TransactionParticipant::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1)); TransactionParticipant* TransactionParticipant::get(OperationContext* opCtx) { auto session = OperationContextSession::get(opCtx); @@ -100,12 +289,9 @@ TransactionParticipant* TransactionParticipant::getFromNonCheckedOutSession(Sess return &getTransactionParticipant(session); } -const Session* TransactionParticipant::_getSession() const { - return getTransactionParticipant.owner(this); -} - -Session* TransactionParticipant::_getSession() { - return getTransactionParticipant.owner(this); +const LogicalSessionId& TransactionParticipant::_sessionId() const { + const auto* owningSession = getTransactionParticipant.owner(this); + return owningSession->getSessionId(); } void TransactionParticipant::_beginOrContinueRetryableWrite(WithLock wl, TxnNumber txnNumber) { @@ -132,11 +318,11 @@ void TransactionParticipant::_continueMultiDocumentTransaction(WithLock wl, TxnN txnNumber == _activeTxnNumber && !_txnState.isNone(wl)); if (_txnState.isInProgress(wl) && !_txnResourceStash) { - // This indicates that the first command in the transaction failed but did not - // implicitly abort the transaction. It is not safe to continue the transaction, in - // particular because we have not saved the readConcern from the first statement of - // the transaction. + // This indicates that the first command in the transaction failed but did not implicitly + // abort the transaction. It is not safe to continue the transaction, in particular because + // we have not saved the readConcern from the first statement of the transaction. _abortTransactionOnSession(wl); + uasserted(ErrorCodes::NoSuchTransaction, str::stream() << "Transaction " << txnNumber << " has been aborted."); } @@ -177,10 +363,15 @@ void TransactionParticipant::beginOrContinue(TxnNumber txnNumber, boost::optional<bool> autocommit, boost::optional<bool> startTransaction) { stdx::lock_guard<stdx::mutex> lg(_mutex); + _checkValid(lg); - if (auto newState = _getSession()->getLastRefreshState()) { - _updateState(lg, *newState); - } + uassert(ErrorCodes::TransactionTooOld, + str::stream() << "Cannot start transaction " << txnNumber << " on session " + << _sessionId() + << " because a newer transaction " + << _activeTxnNumber + << " has already started.", + txnNumber >= _activeTxnNumber); // Requests without an autocommit field are interpreted as retryable writes. They cannot specify // startTransaction, which is verified earlier when parsing the request. @@ -201,14 +392,14 @@ void TransactionParticipant::beginOrContinue(TxnNumber txnNumber, } // Attempt to start a multi-statement transaction, which requires startTransaction be given as - // an argument on the request. startTransaction can only be specified as true, which is verified - // earlier when parsing the request. + // an argument on the request. The 'startTransaction' argument currently can only be specified + // as true, which is verified earlier, when parsing the request. invariant(*startTransaction); - // Servers in a sharded cluster can start a new transaction at the active transaction number to - // allow internal retries by routers on re-targeting errors, like StaleShardVersion or - // SnapshotTooOld. if (txnNumber == _activeTxnNumber) { + // Servers in a sharded cluster can start a new transaction at the active transaction number + // to allow internal retries by routers on re-targeting errors, like + // StaleShard/DatabaseVersion or SnapshotTooOld. uassert(ErrorCodes::ConflictingOperationInProgress, "Only servers in a sharded cluster can start a new transaction at the active " "transaction number", @@ -235,7 +426,11 @@ void TransactionParticipant::beginOrContinue(TxnNumber txnNumber, void TransactionParticipant::beginOrContinueTransactionUnconditionally(TxnNumber txnNumber) { stdx::lock_guard<stdx::mutex> lg(_mutex); - // Continuing transaction unconditionally is a no-op since we don't check any on-disk state. + + // We don't check or fetch any on-disk state, so treat the transaction as 'valid' for the + // purposes of this method and continue the transaction unconditionally + _isValid = true; + if (_activeTxnNumber != txnNumber) { _beginMultiDocumentTransaction(lg, txnNumber); } @@ -450,6 +645,7 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx // Always check session's txnNumber and '_txnState', since they can be modified by session // kill and migration, which do not check out the session. _checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false); + // If this is not a multi-document transaction, there is nothing to unstash. if (_txnState.isNone(lg)) { invariant(!_txnResourceStash); @@ -525,15 +721,11 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, boost::optional<repl::OpTime> prepareOptime) { stdx::unique_lock<stdx::mutex> lk(_mutex); + // Always check session's txnNumber and '_txnState', since they can be modified by // session kill and migration, which do not check out the session. _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); - _getSession()->lockTxnNumber( - _activeTxnNumber, - {ErrorCodes::PreparedTransactionInProgress, - "cannot change transaction number while the session has a prepared transaction"}); - ScopeGuard abortGuard = MakeGuard([&] { // Prepare transaction on secondaries should always succeed. invariant(!prepareOptime); @@ -548,8 +740,8 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, // It is illegal for aborting a prepared transaction to fail for any reason, so we crash // instead. severe() << "Caught exception during abort of prepared transaction " - << opCtx->getTxnNumber() << " on " << _getSession()->getSessionId().toBSON() - << ": " << exceptionToStatus(); + << opCtx->getTxnNumber() << " on " << _sessionId().toBSON() << ": " + << exceptionToStatus(); std::terminate(); } }); @@ -739,14 +931,12 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx, _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); _finishCommitTransaction(lk, opCtx); - _getSession()->unlockTxnNumber(); - } catch (...) { // It is illegal for committing a prepared transaction to fail for any reason, other than an // invalid command, so we crash instead. severe() << "Caught exception during commit of prepared transaction " - << opCtx->getTxnNumber() << " on " << _getSession()->getSessionId().toBSON() - << ": " << exceptionToStatus(); + << opCtx->getTxnNumber() << " on " << _sessionId().toBSON() << ": " + << exceptionToStatus(); std::terminate(); } } @@ -766,7 +956,7 @@ void TransactionParticipant::_commitStorageTransaction(OperationContext* opCtx) } catch (...) { // It is illegal for committing a storage-transaction to fail so we crash instead. severe() << "Caught exception during commit of storage-transaction " << opCtx->getTxnNumber() - << " on " << _getSession()->getSessionId().toBSON() << ": " << exceptionToStatus(); + << " on " << _sessionId().toBSON() << ": " << exceptionToStatus(); std::terminate(); } @@ -825,7 +1015,7 @@ void TransactionParticipant::abortArbitraryTransactionIfExpired() { return; } - const auto session = _getSession(); + const auto* session = getTransactionParticipant.owner(this); auto currentOperation = session->getCurrentOperation(); if (currentOperation) { // If an operation is still running for this transaction when it expires, kill the currently @@ -879,8 +1069,8 @@ void TransactionParticipant::abortActiveUnpreparedOrStashPreparedTransaction( } catch (...) { // It is illegal for this to throw so we catch and log this here for diagnosability. severe() << "Caught exception during transaction " << opCtx->getTxnNumber() - << " abort or stash on " << _getSession()->getSessionId().toBSON() << " in state " - << _txnState << ": " << exceptionToStatus(); + << " abort or stash on " << _sessionId().toBSON() << " in state " << _txnState << ": " + << exceptionToStatus(); std::terminate(); } @@ -997,8 +1187,6 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _prepareOpTime = repl::OpTime(); _oldestOplogEntryTS = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); - - _getSession()->unlockTxnNumber(); } void TransactionParticipant::_cleanUpTxnResourceOnOpCtx( @@ -1027,28 +1215,18 @@ void TransactionParticipant::_cleanUpTxnResourceOnOpCtx( void TransactionParticipant::_checkIsActiveTransaction(WithLock wl, const TxnNumber& requestTxnNumber, bool checkAbort) const { - const auto txnNumber = _getSession()->getActiveTxnNumber(); - uassert(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "Cannot perform operations on active transaction " << _activeTxnNumber - << " on session " - << _getSession()->getSessionId() - << " because a different transaction " - << txnNumber - << " is now active.", - txnNumber == _activeTxnNumber); - uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Cannot perform operations on requested transaction " << requestTxnNumber << " on session " - << _getSession()->getSessionId() + << _sessionId() << " because a different transaction " << _activeTxnNumber << " is now active.", requestTxnNumber == _activeTxnNumber); uassert(ErrorCodes::NoSuchTransaction, - str::stream() << "Transaction " << txnNumber << " has been aborted.", + str::stream() << "Transaction " << _activeTxnNumber << " has been aborted.", !checkAbort || !_txnState.isAborted(wl)); } @@ -1098,7 +1276,7 @@ void TransactionParticipant::reportStashedState(BSONObjBuilder* builder) const { { BSONObjBuilder lsid(builder->subobjStart("lsid")); - _getSession()->getSessionId().serialize(&lsid); + _sessionId().serialize(&lsid); } BSONObjBuilder transactionBuilder; @@ -1238,21 +1416,6 @@ void TransactionParticipant::_reportTransactionStats(WithLock wl, builder, readConcernArgs, tickSource, tickSource->getTicks()); } -void TransactionParticipant::_updateState(WithLock wl, const Session::RefreshState& newState) { - if (newState.refreshCount <= _lastStateRefreshCount) { - return; - } - - _activeTxnNumber = newState.txnNumber; - if (newState.isCommitted) { - _txnState.transitionTo(wl, - TransactionState::kCommitted, - TransactionState::TransitionValidation::kRelaxTransitionValidation); - } - - _lastStateRefreshCount = newState.refreshCount; -} - std::string TransactionParticipant::_transactionInfoForLog( const SingleThreadedLockStats* lockStats, TransactionState::StateFlag terminationCause, @@ -1267,7 +1430,7 @@ std::string TransactionParticipant::_transactionInfoForLog( BSONObjBuilder parametersBuilder; BSONObjBuilder lsidBuilder(parametersBuilder.subobjStart("lsid")); - _getSession()->getSessionId().serialize(&lsidBuilder); + _sessionId().serialize(&lsidBuilder); lsidBuilder.doneFast(); parametersBuilder.append("txnNumber", _activeTxnNumber); @@ -1329,18 +1492,14 @@ void TransactionParticipant::_logSlowTransaction(WithLock wl, } } -void TransactionParticipant::checkForNewTxnNumber() { - auto txnNumber = _getSession()->getActiveTxnNumber(); - - stdx::lock_guard<stdx::mutex> lg(_mutex); - if (txnNumber > _activeTxnNumber) { - _setNewTxnNumber(lg, txnNumber); - } -} - void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnNumber) { - invariant(!_txnState.isInSet( - wl, TransactionState::kPrepared | TransactionState::kCommittingWithPrepare)); + uassert(ErrorCodes::PreparedTransactionInProgress, + "Cannot change transaction number while the session has a prepared transaction", + !_txnState.isInSet( + wl, TransactionState::kPrepared | TransactionState::kCommittingWithPrepare)); + + LOG_FOR_TRANSACTION(4) << "New transaction started with txnNumber: " << txnNumber + << " on session with lsid " << _sessionId().getId(); // Abort the existing transaction if it's not prepared, committed, or aborted. if (_txnState.isInProgress(wl)) { @@ -1348,16 +1507,357 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN } _activeTxnNumber = txnNumber; + + // Reset the retryable writes state + _activeTxnCommittedStatements.clear(); + _hasIncompleteHistory = false; + + // Reset the transactional state _txnState.transitionTo(wl, TransactionState::kNone); - { - stdx::lock_guard<stdx::mutex> lm(_metricsMutex); - _transactionMetricsObserver.resetSingleTransactionStats(txnNumber); - } _prepareOpTime = repl::OpTime(); _oldestOplogEntryTS = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); _multikeyPathInfo.clear(); _autoCommit = boost::none; + + // Reset the transactions metrics + stdx::lock_guard<stdx::mutex> lm(_metricsMutex); + _transactionMetricsObserver.resetSingleTransactionStats(txnNumber); +} + +void TransactionParticipant::refreshFromStorageIfNeeded(OperationContext* opCtx) { + if (opCtx->getClient()->isInDirectClient()) { + return; + } + + invariant(!opCtx->lockState()->isLocked()); + + stdx::unique_lock<stdx::mutex> ul(_mutex); + + while (!_isValid) { + const int numInvalidations = _numInvalidations; + + ul.unlock(); + + auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId()); + + ul.lock(); + + // Protect against concurrent refreshes or invalidations + if (!_isValid && _numInvalidations == numInvalidations) { + _isValid = true; + _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord); + + if (_lastWrittenSessionRecord) { + _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum(); + _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements); + _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory; + + if (activeTxnHistory.transactionCommitted) { + _txnState.transitionTo( + ul, + TransactionState::kCommitted, + TransactionState::TransitionValidation::kRelaxTransitionValidation); + } + } + + break; + } + } +} + +void TransactionParticipant::onWriteOpCompletedOnPrimary( + OperationContext* opCtx, + TxnNumber txnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate, + boost::optional<DurableTxnStateEnum> txnState) { + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + + stdx::unique_lock<stdx::mutex> ul(_mutex); + + // Sanity check that we don't double-execute statements + for (const auto stmtId : stmtIdsWritten) { + const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId); + if (stmtOpTime) { + fassertOnRepeatedExecution( + _sessionId(), txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime); + } + } + + const auto updateRequest = + _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState); + + ul.unlock(); + + repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); + + updateSessionEntry(opCtx, updateRequest); + _registerUpdateCacheOnCommit( + opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); +} + +bool TransactionParticipant::onMigrateBeginOnPrimary(OperationContext* opCtx, + TxnNumber txnNumber, + StmtId stmtId) { + beginOrContinue(txnNumber, boost::none, boost::none); + + try { + if (checkStatementExecuted(opCtx, txnNumber, stmtId)) { + return false; + } + } catch (const DBException& ex) { + // If the transaction chain was truncated on the recipient shard, then we + // are most likely copying from a session that hasn't been touched on the + // recipient shard for a very long time but could be recent on the donor. + // We continue copying regardless to get the entire transaction from the donor. + if (ex.code() != ErrorCodes::IncompleteTransactionHistory) { + throw; + } + if (stmtId == kIncompleteHistoryStmtId) { + return false; + } + } + + return true; +} + +void TransactionParticipant::onMigrateCompletedOnPrimary(OperationContext* opCtx, + TxnNumber txnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t oplogLastStmtIdWriteDate) { + invariant(opCtx->lockState()->inAWriteUnitOfWork()); + + stdx::unique_lock<stdx::mutex> ul(_mutex); + + _checkValid(ul); + _checkIsActiveTransaction(ul, txnNumber); + + // We do not migrate transaction oplog entries. + auto txnState = boost::none; + const auto updateRequest = _makeUpdateRequest( + ul, txnNumber, lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState); + + ul.unlock(); + + repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx); + + updateSessionEntry(opCtx, updateRequest); + _registerUpdateCacheOnCommit( + opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime); +} + +void TransactionParticipant::invalidate() { + stdx::lock_guard<stdx::mutex> lg(_mutex); + + uassert(ErrorCodes::PreparedTransactionInProgress, + "Cannot invalidate prepared transaction", + !_txnState.isInSet( + lg, TransactionState::kPrepared | TransactionState::kCommittingWithPrepare)); + + _isValid = false; + _numInvalidations++; + + _lastWrittenSessionRecord.reset(); + + _activeTxnNumber = kUninitializedTxnNumber; + _activeTxnCommittedStatements.clear(); + _hasIncompleteHistory = false; +} + +repl::OpTime TransactionParticipant::getLastWriteOpTime(TxnNumber txnNumber) const { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _checkValid(lg); + _checkIsActiveTransaction(lg, txnNumber); + + if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) + return {}; + + return _lastWrittenSessionRecord->getLastWriteOpTime(); +} + +boost::optional<repl::OplogEntry> TransactionParticipant::checkStatementExecuted( + OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) const { + const auto stmtTimestamp = [&] { + stdx::lock_guard<stdx::mutex> lg(_mutex); + return _checkStatementExecuted(lg, txnNumber, stmtId); + }(); + + if (!stmtTimestamp) + return boost::none; + + TransactionHistoryIterator txnIter(*stmtTimestamp); + while (txnIter.hasNext()) { + const auto entry = txnIter.next(opCtx); + invariant(entry.getStatementId()); + if (*entry.getStatementId() == stmtId) + return entry; + } + + MONGO_UNREACHABLE; +} + +bool TransactionParticipant::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, + StmtId stmtId) const { + stdx::lock_guard<stdx::mutex> lg(_mutex); + return bool(_checkStatementExecuted(lg, txnNumber, stmtId)); +} + +void TransactionParticipant::_checkValid(WithLock) const { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Session " << _sessionId() + << " was concurrently modified and the operation must be retried.", + _isValid); +} + +void TransactionParticipant::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const { + uassert(ErrorCodes::ConflictingOperationInProgress, + str::stream() << "Cannot perform operations on transaction " << txnNumber + << " on session " + << _sessionId() + << " because a different transaction " + << _activeTxnNumber + << " is now active.", + txnNumber == _activeTxnNumber); +} + +boost::optional<repl::OpTime> TransactionParticipant::_checkStatementExecuted(WithLock wl, + TxnNumber txnNumber, + StmtId stmtId) const { + _checkValid(wl); + _checkIsActiveTransaction(wl, txnNumber); + + const auto it = _activeTxnCommittedStatements.find(stmtId); + if (it == _activeTxnCommittedStatements.end()) { + uassert(ErrorCodes::IncompleteTransactionHistory, + str::stream() << "Incomplete history detected for transaction " << txnNumber + << " on session " + << _sessionId(), + !_hasIncompleteHistory); + + return boost::none; + } + + invariant(_lastWrittenSessionRecord); + invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber); + + return it->second; +} + +Date_t TransactionParticipant::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const { + _checkValid(wl); + _checkIsActiveTransaction(wl, txnNumber); + + if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber) + return {}; + + return _lastWrittenSessionRecord->getLastWriteDate(); +} + +UpdateRequest TransactionParticipant::_makeUpdateRequest( + WithLock, + TxnNumber newTxnNumber, + const repl::OpTime& newLastWriteOpTime, + Date_t newLastWriteDate, + boost::optional<DurableTxnStateEnum> newState) const { + UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace); + + const auto updateBSON = [&] { + SessionTxnRecord newTxnRecord; + newTxnRecord.setSessionId(_sessionId()); + newTxnRecord.setTxnNum(newTxnNumber); + newTxnRecord.setLastWriteOpTime(newLastWriteOpTime); + newTxnRecord.setLastWriteDate(newLastWriteDate); + newTxnRecord.setState(newState); + return newTxnRecord.toBSON(); + }(); + updateRequest.setUpdates(updateBSON); + updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId().toBSON())); + updateRequest.setUpsert(true); + + return updateRequest; +} + +void TransactionParticipant::_registerUpdateCacheOnCommit( + OperationContext* opCtx, + TxnNumber newTxnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteOpTime) { + opCtx->recoveryUnit()->onCommit( + [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ]( + boost::optional<Timestamp>) { + RetryableWritesStats::get(getGlobalServiceContext()) + ->incrementTransactionsCollectionWriteCount(); + + stdx::lock_guard<stdx::mutex> lg(_mutex); + + if (!_isValid) + return; + + // The cache of the last written record must always be advanced after a write so that + // subsequent writes have the correct point to start from. + if (!_lastWrittenSessionRecord) { + _lastWrittenSessionRecord.emplace(); + + _lastWrittenSessionRecord->setSessionId(_sessionId()); + _lastWrittenSessionRecord->setTxnNum(newTxnNumber); + _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); + } else { + if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum()) + _lastWrittenSessionRecord->setTxnNum(newTxnNumber); + + if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime()) + _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime); + } + + if (newTxnNumber > _activeTxnNumber) { + // This call is necessary in order to advance the txn number and reset the cached + // state in the case where just before the storage transaction commits, the cache + // entry gets invalidated and immediately refreshed while there were no writes for + // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber + // and we will fail to update the cache even though the write was successful. + _beginOrContinueRetryableWrite(lg, newTxnNumber); + } + + if (newTxnNumber == _activeTxnNumber) { + for (const auto stmtId : stmtIdsWritten) { + if (stmtId == kIncompleteHistoryStmtId) { + _hasIncompleteHistory = true; + continue; + } + + const auto insertRes = + _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime); + if (!insertRes.second) { + const auto& existingOpTime = insertRes.first->second; + fassertOnRepeatedExecution(_sessionId(), + newTxnNumber, + stmtId, + existingOpTime, + lastStmtIdWriteOpTime); + } + } + } + }); + + MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) { + const auto& data = customArgs.getData(); + + const auto closeConnectionElem = data["closeConnection"]; + if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) { + opCtx->getClient()->session()->end(); + } + + const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"]; + if (!failBeforeCommitExceptionElem.eoo()) { + const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number())); + uasserted(failureCode, + str::stream() << "Failing write for " << _sessionId() << ":" << newTxnNumber + << " due to failpoint. The write must not be reflected."); + } + } } } // namespace mongo diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 9bdefa70dec..29e9b3e8bfe 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -36,15 +36,18 @@ #include "mongo/db/concurrency/locker.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/multi_key_path_tracker.h" +#include "mongo/db/ops/update_request.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/session.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/db/single_transaction_stats.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/db/transaction_metrics_observer.h" +#include "mongo/stdx/unordered_map.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/with_lock.h" -#include "mongo/util/decorable.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -139,6 +142,10 @@ public: OperationContext* _opCtx; }; + using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>; + + static const BSONObj kDeadEndSentinel; + TransactionParticipant() = default; /** @@ -353,22 +360,44 @@ public: } /** - * Starts a new transaction, or continues an already active transaction. + * Starts a new transaction (and if the txnNumber is newer aborts any in-progress transaction on + * the session), or continues an already active transaction. + * + * 'autocommit' comes from the 'autocommit' field in the original client request. The only valid + * values are boost::none (meaning no autocommit was specified) and false (meaning that this is + * the beginning of a multi-statement transaction). + * + * 'startTransaction' comes from the 'startTransaction' field in the original client request. + * See below for the acceptable values and the meaning of the combinations of autocommit and + * startTransaction. * - * The 'autocommit' argument represents the value of the field given in the original client - * request. If it is boost::none, no autocommit parameter was passed into the request. Every - * operation that is part of a multi statement transaction must specify 'autocommit=false'. - * 'startTransaction' represents the value of the field given in the original client request, - * and indicates whether this operation is the beginning of a multi-statement transaction. + * autocommit = boost::none, startTransaction = boost::none: Means retryable write + * autocommit = false, startTransaction = boost::none: Means continuation of a multi-statement + * transaction + * autocommit = false, startTransaction = true: Means abort whatever transaction is in progress + * on the session and start a new transaction * - * Throws an exception if: - * - The values of 'autocommit' and/or 'startTransaction' are inconsistent with the current - * state of the transaction. + * Any combination other than the ones listed above will invariant since it is expected that the + * caller has performed the necessary customer input validations. + * + * Exceptions of note, which can be thrown are: + * - TransactionTooOld - if attempt is made to start a transaction older than the currently + * active one or the last one which committed + * - PreparedTransactionInProgress - if the transaction is in the prepared state and a new + * transaction or retryable write is attempted */ void beginOrContinue(TxnNumber txnNumber, boost::optional<bool> autocommit, boost::optional<bool> startTransaction); + /** + * Used only by the secondary oplog application logic. Equivalent to 'beginOrContinue(txnNumber, + * false, true)' without performing any checks for whether the new txnNumber will start a + * transaction number in the past. + * + * NOTE: This method assumes that there are no concurrent users of the transaction since it + * unconditionally changes the active transaction on the session. + */ void beginOrContinueTransactionUnconditionally(TxnNumber txnNumber); void transitionToPreparedforTest() { @@ -382,10 +411,101 @@ public: } /** - * Checks to see if the txnNumber changed in the parent session and perform the necessary - * cleanup. + * Blocking method, which loads the transaction state from storage if it has been marked as + * needing refresh. + * + * In order to avoid the possibility of deadlock, this method must not be called while holding a + * lock. + */ + void refreshFromStorageIfNeeded(OperationContext* opCtx); + + TxnNumber getActiveTxnNumber() const { + stdx::lock_guard<stdx::mutex> lg(_mutex); + return _activeTxnNumber; + } + + /** + * Called after a write under the specified transaction completes while the node is a primary + * and specifies the statement ids which were written. Must be called while the caller is still + * in the write's WUOW. Updates the on-disk state of the session to match the specified + * transaction/opTime and keeps the cached state in sync. + * + * 'txnState' is 'none' for retryable writes. + * + * Must only be called with the session checked-out. + * + * Throws if the session has been invalidated or the active transaction number doesn't match. + */ + void onWriteOpCompletedOnPrimary(OperationContext* opCtx, + TxnNumber txnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate, + boost::optional<DurableTxnStateEnum> txnState); + + /** + * Helper function to begin a migration on a primary node. + * + * Returns whether the specified statement should be migrated at all or skipped. + * + * Not called with session checked out. + */ + bool onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId); + + /** + * Called after an entry for the specified session and transaction has been written to the oplog + * during chunk migration, while the node is still primary. Must be called while the caller is + * still in the oplog write's WUOW. Updates the on-disk state of the session to match the + * specified transaction/opTime and keeps the cached state in sync. + * + * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary + * and doesn't require the session to be checked-out. + * + * Throws if the session has been invalidated or the active transaction number is newer than the + * one specified. + */ + void onMigrateCompletedOnPrimary(OperationContext* opCtx, + TxnNumber txnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t oplogLastStmtIdWriteDate); + + /** + * Marks the session as requiring refresh. Used when the session state has been modified + * externally, such as through a direct write to the transactions table. + */ + void invalidate(); + + /** + * Returns the op time of the last committed write for this session and transaction. If no write + * has completed yet, returns an empty timestamp. + * + * Throws if the session has been invalidated or the active transaction number doesn't match. + */ + repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const; + + /** + * Checks whether the given statementId for the specified transaction has already executed and + * if so, returns the oplog entry which was generated by that write. If the statementId hasn't + * executed, returns boost::none. + * + * Must only be called with the session checked-out. + * + * Throws if the session has been invalidated or the active transaction number doesn't match. + */ + boost::optional<repl::OplogEntry> checkStatementExecuted(OperationContext* opCtx, + TxnNumber txnNumber, + StmtId stmtId) const; + + /** + * Checks whether the given statementId for the specified transaction has already executed + * without fetching the oplog entry which was generated by that write. + * + * Must only be called with the session checked-out. + * + * Throws if the session has been invalidated or the active transaction number doesn't match. */ - void checkForNewTxnNumber(); + bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const; private: /** @@ -503,6 +623,43 @@ private: return (s << txnState.toString()); } + // Shortcut to obtain the id of the session under which this participant runs + const LogicalSessionId& _sessionId() const; + + /** + * Performing any checks based on the in-memory state of the TransactionParticipant requires + * that the object is fully in sync with its on-disk representation in the transactions table. + * This method checks that. The object can be out of sync with the on-disk representation either + * when it was just created, or after invalidate() was called (which typically happens after a + * direct write to the transactions table). + */ + void _checkValid(WithLock) const; + + // Checks that the specified transaction number is the same as the activeTxnNumber. Effectively + // a check that the caller operates on the transaction it thinks it is operating on. + void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const; + + boost::optional<repl::OpTime> _checkStatementExecuted(WithLock, + TxnNumber txnNumber, + StmtId stmtId) const; + + // Returns the write date of the last committed write for this session and transaction. If no + // write has completed yet, returns an empty date. + // + // Throws if the session has been invalidated or the active transaction number doesn't match. + Date_t _getLastWriteDate(WithLock, TxnNumber txnNumber) const; + + UpdateRequest _makeUpdateRequest(WithLock, + TxnNumber newTxnNumber, + const repl::OpTime& newLastWriteOpTime, + Date_t newLastWriteDate, + boost::optional<DurableTxnStateEnum> newState) const; + + void _registerUpdateCacheOnCommit(OperationContext* opCtx, + TxnNumber newTxnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteTs); + // Finishes committing the multi-document transaction after the storage-transaction has been // committed, the oplog entry has been inserted into the oplog, and the transactions table has // been updated. @@ -562,8 +719,6 @@ private: BSONObjBuilder* builder, repl::ReadConcernArgs readConcernArgs) const; - void _updateState(WithLock wl, const Session::RefreshState& newState); - // Bumps up the transaction number of this transaction and perform the necessary cleanup. void _setNewTxnNumber(WithLock wl, const TxnNumber& txnNumber); @@ -577,10 +732,6 @@ private: // number. void _continueMultiDocumentTransaction(WithLock wl, TxnNumber txnNumber); - // Returns the session that this transaction belongs to. - const Session* _getSession() const; - Session* _getSession(); - // Protects the member variables below. mutable stdx::mutex _mutex; @@ -599,10 +750,9 @@ private: // Total size in bytes of all operations within the _transactionOperations vector. size_t _transactionOperationBytes = 0; - // This is the txnNumber that this transaction is actively working on. It can be different from - // the current txnNumber of the parent session (since it can be changed in couple of ways, like - // migration). In which case, it should make the necessary steps to also bump this number, like - // aborting the current transaction. + // Tracks the last seen txn number for the session and is always >= to the transaction number in + // the last written txn record. When it is > than that in the last written txn record, this + // means a new transaction has begun on the session, but it hasn't yet performed any writes. TxnNumber _activeTxnNumber{kUninitializedTxnNumber}; // Set when a snapshot read / transaction begins. Alleviates cache pressure by limiting how long @@ -625,8 +775,28 @@ private: std::vector<MultikeyPathInfo> _multikeyPathInfo; - // Remembers the refresh count this object has read from Session. - long long _lastStateRefreshCount{0}; + // + // Retryable writes state + // + + // Specifies whether the session information needs to be refreshed from storage + bool _isValid{false}; + + // Counter, incremented with each call to invalidate in order to discern invalidations, which + // happen during refresh + int _numInvalidations{0}; + + // Set to true if incomplete history is detected. For example, when the oplog to a write was + // truncated because it was too old. + bool _hasIncompleteHistory{false}; + + // Caches what is known to be the last written transaction record for the session + boost::optional<SessionTxnRecord> _lastWrittenSessionRecord; + + // For the active txn, tracks which statement ids have been committed and at which oplog + // opTime. Used for fast retryability check and retrieving the previous write's data without + // having to scan through the oplog. + CommittedStatementTimestampMap _activeTxnCommittedStatements; // Protects _transactionMetricsObserver. The concurrency rules are that const methods on // _transactionMetricsObserver may be called under either _mutex or _metricsMutex, but for diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 193b219db61..0836660bba1 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -43,6 +43,7 @@ #include "mongo/db/service_context.h" #include "mongo/db/session_catalog.h" #include "mongo/db/stats/fill_locker_info.h" +#include "mongo/db/transaction_participant.h" #include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/death_test.h" @@ -85,14 +86,34 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, class OpObserverMock : public OpObserverNoop { public: - void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override; + void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override { + ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); + OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime); + + uassert(ErrorCodes::OperationFailed, + "onTransactionPrepare() failed", + !onTransactionPrepareThrowsException); + + onTransactionPrepareFn(); + } + bool onTransactionPrepareThrowsException = false; bool transactionPrepared = false; stdx::function<void()> onTransactionPrepareFn = [this]() { transactionPrepared = true; }; void onTransactionCommit(OperationContext* opCtx, boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) override; + boost::optional<Timestamp> commitTimestamp) override { + ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); + OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp); + + uassert(ErrorCodes::OperationFailed, + "onTransactionCommit() failed", + !onTransactionCommitThrowsException); + + onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp); + } + bool onTransactionCommitThrowsException = false; bool transactionCommitted = false; stdx::function<void(boost::optional<OplogSlot>, boost::optional<Timestamp>)> @@ -101,31 +122,7 @@ public: boost::optional<Timestamp> commitTimestamp) { transactionCommitted = true; }; }; -void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) { - ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime); - - uassert(ErrorCodes::OperationFailed, - "onTransactionPrepare() failed", - !onTransactionPrepareThrowsException); - - onTransactionPrepareFn(); -} - -void OpObserverMock::onTransactionCommit(OperationContext* opCtx, - boost::optional<OplogSlot> commitOplogEntryOpTime, - boost::optional<Timestamp> commitTimestamp) { - ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork()); - OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp); - - uassert(ErrorCodes::OperationFailed, - "onTransactionCommit() failed", - !onTransactionCommitThrowsException); - - onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp); -} - -class SessionTest : public MockReplCoordServerFixture { +class TransactionParticipantRetryableWritesTest : public MockReplCoordServerFixture { protected: void setUp() final { MockReplCoordServerFixture::setUp(); @@ -194,13 +191,15 @@ protected: repl::OpTime prevOpTime, boost::optional<DurableTxnStateEnum> txnState) { const auto uuid = UUID::gen(); - session->beginOrContinueTxn(opCtx(), txnNum); + + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, uuid, session->getSessionId(), txnNum, stmtId, prevOpTime); - session->onWriteOpCompletedOnPrimary( + txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState); wuow.commit(); @@ -228,26 +227,29 @@ protected: ASSERT(txnRecord.getState() == txnState); ASSERT_EQ(txnState != boost::none, txnRecordObj.hasField(SessionTxnRecord::kStateFieldName)); - ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum)); - session->invalidate(); - session->refreshFromStorageIfNeeded(opCtx()); - ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum)); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session); + ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); + + txnParticipant->invalidate(); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); + ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); } OpObserverMock* _opObserver = nullptr; }; -TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) { +TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 20; - session.beginOrContinueTxn(opCtx(), txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); ASSERT_EQ(sessionId, session.getSessionId()); - ASSERT(session.getLastWriteOpTime(txnNum).isNull()); + ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull()); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, @@ -256,13 +258,14 @@ TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) { ASSERT(!cursor->more()); } -TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { +TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrite) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 21; - session.beginOrContinueTxn(opCtx(), txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none); @@ -279,13 +282,15 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { ASSERT_EQ(txnNum, txnRecord.getTxnNum()); ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); ASSERT(!txnRecord.getState()); - ASSERT_EQ(opTime, session.getLastWriteOpTime(txnNum)); + ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); } -TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { +TEST_F(TransactionParticipantRetryableWritesTest, + StartingNewerTransactionUpdatesThePersistedSession) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none); @@ -303,17 +308,18 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { ASSERT_EQ(200, txnRecord.getTxnNum()); ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime()); ASSERT(!txnRecord.getState()); - ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); + ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200)); - session.invalidate(); - session.refreshFromStorageIfNeeded(opCtx()); - ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); + txnParticipant->invalidate(); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); + ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200)); } -TEST_F(SessionTest, TransactionTableUpdatesReplaceEntireDocument) { +TEST_F(TransactionParticipantRetryableWritesTest, TransactionTableUpdatesReplaceEntireDocument) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); assertTxnRecord(&session, 100, 0, firstOpTime, boost::none); @@ -327,25 +333,26 @@ TEST_F(SessionTest, TransactionTableUpdatesReplaceEntireDocument) { assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none); } -TEST_F(SessionTest, StartingOldTxnShouldAssert) { +TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 20; - session.beginOrContinueTxn(opCtx(), txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - ASSERT_THROWS_CODE(session.beginOrContinueTxn(opCtx(), txnNum - 1), + ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(txnNum - 1, boost::none, boost::none), AssertionException, ErrorCodes::TransactionTooOld); - ASSERT(session.getLastWriteOpTime(txnNum).isNull()); + ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull()); } -TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { - const auto uuid = UUID::gen(); +TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionNotDefaultCreated) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); // Drop the transactions table BSONObj dropResult; @@ -354,84 +361,93 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { ASSERT(client.runCommand(nss.db().toString(), BSON("drop" << nss.coll()), dropResult)); const TxnNumber txnNum = 21; - session.beginOrContinueTxn(opCtx(), txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); + + const auto uuid = UUID::gen(); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); - ASSERT_THROWS(session.onWriteOpCompletedOnPrimary( + ASSERT_THROWS(txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none), AssertionException); } -TEST_F(SessionTest, CheckStatementExecuted) { +TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; - session.beginOrContinueTxn(opCtx(), txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000)); - ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); + ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000)); + ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none); - ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000)); - ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); + ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000)); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); - ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000)); - ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); + ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000)); + ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none); - ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000)); - ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); + ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000)); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); // Invalidate the session and ensure the statements still check out - session.invalidate(); - session.refreshFromStorageIfNeeded(opCtx()); + txnParticipant->invalidate(); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); - ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000)); - ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000)); + ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000)); + ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000)); - ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); - ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); } -TEST_F(SessionTest, CheckStatementExecutedForOldTransactionThrows) { +TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; - session.beginOrContinueTxn(opCtx(), txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum - 1, 0), + ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), txnNum - 1, 0), AssertionException, ErrorCodes::ConflictingOperationInProgress); } -TEST_F(SessionTest, CheckStatementExecutedForInvalidatedTransactionThrows) { +TEST_F(TransactionParticipantRetryableWritesTest, + CheckStatementExecutedForInvalidatedTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.invalidate(); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->invalidate(); - ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), 100, 0), + ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), 100, 0), AssertionException, ErrorCodes::ConflictingOperationInProgress); } -TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { - const auto uuid = UUID::gen(); +TEST_F(TransactionParticipantRetryableWritesTest, + WriteOpCompletedOnPrimaryForOldTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; - session.beginOrContinueTxn(opCtx(), txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); + + const auto uuid = UUID::gen(); { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary( + txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); wuow.commit(); } @@ -440,60 +456,64 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum - 1, 0); - ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary( + ASSERT_THROWS_CODE(txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none), AssertionException, ErrorCodes::ConflictingOperationInProgress); } } -TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { - const auto uuid = UUID::gen(); +TEST_F(TransactionParticipantRetryableWritesTest, + WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; - session.beginOrContinueTxn(opCtx(), txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); + const auto uuid = UUID::gen(); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); - session.invalidate(); + txnParticipant->invalidate(); - ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary( + ASSERT_THROWS_CODE(txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none), AssertionException, ErrorCodes::ConflictingOperationInProgress); } -TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { - const auto uuid = UUID::gen(); +TEST_F(TransactionParticipantRetryableWritesTest, + WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; - session.beginOrContinueTxn(opCtx(), txnNum); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); + const auto uuid = UUID::gen(); const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary( + txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); - session.invalidate(); + txnParticipant->invalidate(); wuow.commit(); } - session.refreshFromStorageIfNeeded(opCtx()); - ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 0)); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); + ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0)); } -TEST_F(SessionTest, IncompleteHistoryDueToOpLogTruncation) { +TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) { const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 2; @@ -546,22 +566,23 @@ TEST_F(SessionTest, IncompleteHistoryDueToOpLogTruncation) { } Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); - ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum, 0), + ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0), AssertionException, ErrorCodes::IncompleteTransactionHistory); - ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1)); - ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2)); + ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1)); + ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2)); - ASSERT_THROWS_CODE(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 0), + ASSERT_THROWS_CODE(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 0), AssertionException, ErrorCodes::IncompleteTransactionHistory); - ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1)); - ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2)); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1)); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2)); } -TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { +TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { const auto uuid = UUID::gen(); const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 2; @@ -571,8 +592,9 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { osi.setTxnNumber(txnNum); Session session(sessionId); - session.refreshFromStorageIfNeeded(opCtx()); - session.beginOrContinueTxn(opCtx(), txnNum); + const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); auto firstOpTime = ([&]() { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); @@ -585,7 +607,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { kNss, uuid, BSON("x" << 1), - &Session::kDeadEndSentinel, + &TransactionParticipant::kDeadEndSentinel, false, wallClockTime, osi, @@ -593,7 +615,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { {}, false /* prepare */, OplogSlot()); - session.onWriteOpCompletedOnPrimary( + txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none); wuow.commit(); @@ -614,7 +636,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { kNss, uuid, {}, - &Session::kDeadEndSentinel, + &TransactionParticipant::kDeadEndSentinel, false, wallClockTime, osi, @@ -623,30 +645,30 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { false /* prepare */, OplogSlot()); - session.onWriteOpCompletedOnPrimary( + txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime, boost::none); wuow.commit(); } { - auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1); + auto oplog = txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1); ASSERT_TRUE(oplog); ASSERT_EQ(firstOpTime, oplog->getOpTime()); } - ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException); + ASSERT_THROWS(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2), AssertionException); // Should have the same behavior after loading state from storage. - session.invalidate(); - session.refreshFromStorageIfNeeded(opCtx()); + txnParticipant->invalidate(); + txnParticipant->refreshFromStorageIfNeeded(opCtx()); { - auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1); + auto oplog = txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1); ASSERT_TRUE(oplog); ASSERT_EQ(firstOpTime, oplog->getOpTime()); } - ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException); + ASSERT_THROWS(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2), AssertionException); } } // namespace diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 06ec5f5a00e..20282f83b25 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -227,24 +227,21 @@ protected: void bumpTxnNumberFromDifferentOpCtx(const LogicalSessionId& sessionId, TxnNumber newTxnNum) { auto func = [sessionId, newTxnNum](OperationContext* opCtx) { - auto session = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId); auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); // Check that there is a transaction in progress with a lower txnNumber. ASSERT(txnParticipant->inMultiDocumentTransaction()); - ASSERT_LT(session->getActiveTxnNumber(), newTxnNum); + ASSERT_LT(txnParticipant->getActiveTxnNumber(), newTxnNum); // Check that the transaction has some operations, so we can ensure they are cleared. ASSERT_GT(txnParticipant->transactionOperationsForTest().size(), 0u); // Bump the active transaction number on the txnParticipant. This should clear all state // from the previous transaction. - session->beginOrContinueTxn(opCtx, newTxnNum); - ASSERT_EQ(session->getActiveTxnNumber(), newTxnNum); - - txnParticipant->checkForNewTxnNumber(); + txnParticipant->beginOrContinue(newTxnNum, boost::none, boost::none); + ASSERT_EQ(newTxnNum, txnParticipant->getActiveTxnNumber()); ASSERT_FALSE(txnParticipant->transactionIsAborted()); ASSERT_EQ(txnParticipant->transactionOperationsForTest().size(), 0u); }; @@ -1114,9 +1111,11 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr auto func = [&](OperationContext* newOpCtx) { auto session = SessionCatalog::get(newOpCtx)->getOrCreateSession( newOpCtx, *opCtx()->getLogicalSessionId()); + auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session.get()); ASSERT_THROWS_CODE( - session->onMigrateBeginOnPrimary(newOpCtx, *opCtx()->getTxnNumber() + 1, 1), + txnParticipant->beginOrContinue(*opCtx()->getTxnNumber() + 1, false, true), AssertionException, ErrorCodes::PreparedTransactionInProgress); }; |