summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-09-27 06:05:54 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-08 02:38:48 -0400
commitb272bf351c39677d1e87d5c7fcd8b15b61465012 (patch)
tree9a92c09de2c9eb4244ca4b97d320f9d1e70637af /src/mongo/db
parent07066a49b935a538ed54716fdd9a98d40c31fba4 (diff)
downloadmongo-b272bf351c39677d1e87d5c7fcd8b15b61465012.tar.gz
SERVER-36799 Move all transactions and retryable writes functionality from Session into TransactionParticipant
This change leaves the Session class to be a plain decorable structure only used for serialization of operations on the same logical session.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript6
-rw-r--r--src/mongo/db/commands/find_and_modify.cpp4
-rw-r--r--src/mongo/db/op_observer_impl.cpp33
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp108
-rw-r--r--src/mongo/db/operation_context_session_mongod.cpp22
-rw-r--r--src/mongo/db/operation_context_session_mongod.h8
-rw-r--r--src/mongo/db/ops/write_ops_exec.cpp15
-rw-r--r--src/mongo/db/repl/oplog.cpp5
-rw-r--r--src/mongo/db/repl/session_update_tracker.cpp70
-rw-r--r--src/mongo/db/repl/sync_tail.cpp13
-rw-r--r--src/mongo/db/repl/sync_tail_test.cpp1
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp14
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp73
-rw-r--r--src/mongo/db/s/session_catalog_migration_source.cpp17
-rw-r--r--src/mongo/db/s/session_catalog_migration_source_test.cpp3
-rw-r--r--src/mongo/db/session.cpp669
-rw-r--r--src/mongo/db/session.h245
-rw-r--r--src/mongo/db/session_catalog.cpp4
-rw-r--r--src/mongo/db/session_catalog.h1
-rw-r--r--src/mongo/db/transaction_participant.cpp662
-rw-r--r--src/mongo/db/transaction_participant.h220
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp (renamed from src/mongo/db/session_test.cpp)270
-rw-r--r--src/mongo/db/transaction_participant_test.cpp13
23 files changed, 1172 insertions, 1304 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 8f803deac3a..50b21051fdc 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1962,13 +1962,12 @@ env.CppUnitTest(
target='sessions_test',
source=[
'session_catalog_test.cpp',
- 'session_test.cpp',
],
LIBDEPS=[
- 'query_exec',
+ '$BUILD_DIR/mongo/client/read_preference',
'$BUILD_DIR/mongo/db/auth/authmocks',
'$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
- '$BUILD_DIR/mongo/client/read_preference'
+ 'query_exec',
],
)
@@ -1976,6 +1975,7 @@ env.CppUnitTest(
target='transaction_participant_test',
source=[
'transaction_participant_test.cpp',
+ 'transaction_participant_retryable_writes_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/client/read_preference',
diff --git a/src/mongo/db/commands/find_and_modify.cpp b/src/mongo/db/commands/find_and_modify.cpp
index b56ad230b4d..c87fdf830ef 100644
--- a/src/mongo/db/commands/find_and_modify.cpp
+++ b/src/mongo/db/commands/find_and_modify.cpp
@@ -352,9 +352,9 @@ public:
const auto stmtId = 0;
if (opCtx->getTxnNumber() && !inTransaction) {
- auto session = OperationContextSession::get(opCtx);
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
if (auto entry =
- session->checkStatementExecuted(opCtx, *opCtx->getTxnNumber(), stmtId)) {
+ txnParticipant->checkStatementExecuted(opCtx, *opCtx->getTxnNumber(), stmtId)) {
RetryableWritesStats::get(opCtx)->incrementRetriedCommandsCount();
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
parseOplogEntryForFindAndModify(opCtx, args, *entry, &result);
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index 3c21adbe3fa..34e52fd95ab 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -116,12 +116,13 @@ void onWriteOpCompleted(OperationContext* opCtx,
return;
if (session) {
- session->onWriteOpCompletedOnPrimary(opCtx,
- *opCtx->getTxnNumber(),
- std::move(stmtIdsWritten),
- lastStmtIdWriteOpTime,
- lastStmtIdWriteDate,
- txnState);
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant->onWriteOpCompletedOnPrimary(opCtx,
+ *opCtx->getTxnNumber(),
+ std::move(stmtIdsWritten),
+ lastStmtIdWriteOpTime,
+ lastStmtIdWriteDate,
+ txnState);
}
}
@@ -186,7 +187,9 @@ OpTimeBundle replLogUpdate(OperationContext* opCtx,
if (session) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
+
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
}
OpTimeBundle opTimes;
@@ -250,7 +253,9 @@ OpTimeBundle replLogDelete(OperationContext* opCtx,
if (session) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
+
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
}
OpTimeBundle opTimes;
@@ -934,8 +939,9 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
repl::OplogLink oplogLink;
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- StmtId stmtId(0);
- oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
+
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
// Until we support multiple oplog entries per transaction, prevOpTime should always be null.
invariant(oplogLink.prevOpTime.isNull());
@@ -947,6 +953,8 @@ OpTimeBundle logApplyOpsForTransaction(OperationContext* opCtx,
applyOpsBuilder.append("prepare", true);
}
auto applyOpCmd = applyOpsBuilder.done();
+ const StmtId stmtId(0);
+
auto times = replLogApplyOps(
opCtx, cmdNss, applyOpCmd, sessionInfo, stmtId, oplogLink, prepare, prepareOplogSlot);
@@ -969,14 +977,15 @@ void logCommitOrAbortForPreparedTransaction(OperationContext* opCtx,
const OplogSlot& oplogSlot,
const BSONObj& objectField,
DurableTxnStateEnum durableState) {
- invariant(session->isLockedTxnNumber(*opCtx->getTxnNumber()));
const NamespaceString cmdNss{"admin", "$cmd"};
OperationSessionInfo sessionInfo;
repl::OplogLink oplogLink;
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
+
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
const StmtId stmtId(1);
const auto wallClockTime = getWallClockTimeForOpLog(opCtx);
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index d9c72a3b135..c5d202cee71 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -1,30 +1,30 @@
/**
-* Copyright (C) 2017 MongoDB Inc.
-*
-* This program is free software: you can redistribute it and/or modify
-* it under the terms of the GNU Affero General Public License, version 3,
-* as published by the Free Software Foundation.
-*
-* This program is distributed in the hope that it will be useful,
-* but WITHOUT ANY WARRANTY; without even the implied warranty of
-* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-* GNU Affero General Public License for more details.
-*
-* You should have received a copy of the GNU Affero General Public License
-* along with this program. If not, see <http://www.gnu.org/licenses/>.
-*
-* As a special exception, the copyright holders give permission to link the
-* code of portions of this program with the OpenSSL library under certain
-* conditions as described in each individual source file and distribute
-* linked combinations including the program with the OpenSSL library. You
-* must comply with the GNU Affero General Public License in all respects for
-* all of the code used other than as permitted herein. If you modify file(s)
-* with this exception, you may extend this exception to your version of the
-* file(s), but you are not obligated to do so. If you do not wish to do so,
-* delete this exception statement from your version. If you delete this
-* exception statement from all source files in the program, then also delete
-* it in the license file.
-*/
+ * Copyright (C) 2017 MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
#include "mongo/db/op_observer_impl.h"
@@ -311,17 +311,17 @@ public:
* statement id.
*/
void simulateSessionWrite(OperationContext* opCtx,
- ScopedSession session,
+ TransactionParticipant* txnParticipant,
NamespaceString nss,
TxnNumber txnNum,
StmtId stmtId) {
- session->beginOrContinueTxn(opCtx, txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
{
AutoGetCollection autoColl(opCtx, nss, MODE_IX);
WriteUnitOfWork wuow(opCtx);
auto opTime = repl::OpTime(Timestamp(10, 1), 1); // Dummy timestamp.
- session->onWriteOpCompletedOnPrimary(
+ txnParticipant->onWriteOpCompletedOnPrimary(
opCtx, txnNum, {stmtId}, opTime, Date_t::now(), boost::none);
wuow.commit();
}
@@ -337,22 +337,23 @@ TEST_F(OpObserverSessionCatalogTest, OnRollbackInvalidatesSessionCatalogIfSessio
auto sessionCatalog = SessionCatalog::get(getServiceContext());
auto sessionId = makeLogicalSessionIdForTest();
auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId);
- session->refreshFromStorageIfNeeded(opCtx.get());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ txnParticipant->refreshFromStorageIfNeeded(opCtx.get());
// Simulate a write occurring on that session.
const TxnNumber txnNum = 0;
const StmtId stmtId = 1000;
- simulateSessionWrite(opCtx.get(), session, nss, txnNum, stmtId);
+ simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId);
// Check that the statement executed.
- ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
// The OpObserver should invalidate in-memory session state, so the check after this should
// fail.
OpObserver::RollbackObserverInfo rbInfo;
rbInfo.rollbackSessionIds = {UUID::gen()};
opObserver.onReplicationRollback(opCtx.get(), rbInfo);
- ASSERT_THROWS_CODE(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId),
+ ASSERT_THROWS_CODE(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId),
DBException,
ErrorCodes::ConflictingOperationInProgress);
}
@@ -367,21 +368,22 @@ TEST_F(OpObserverSessionCatalogTest,
auto sessionCatalog = SessionCatalog::get(getServiceContext());
auto sessionId = makeLogicalSessionIdForTest();
auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId);
- session->refreshFromStorageIfNeeded(opCtx.get());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ txnParticipant->refreshFromStorageIfNeeded(opCtx.get());
// Simulate a write occurring on that session.
const TxnNumber txnNum = 0;
const StmtId stmtId = 1000;
- simulateSessionWrite(opCtx.get(), session, nss, txnNum, stmtId);
+ simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId);
// Check that the statement executed.
- ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
// The OpObserver should not invalidate the in-memory session state, so the check after this
// should still succeed.
OpObserver::RollbackObserverInfo rbInfo;
opObserver.onReplicationRollback(opCtx.get(), rbInfo);
- ASSERT(session->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
}
/**
@@ -504,7 +506,10 @@ public:
auto sessionCatalog = SessionCatalog::get(getServiceContext());
auto sessionId = makeLogicalSessionIdForTest();
_session = sessionCatalog->getOrCreateSession(opCtx(), sessionId);
- _session->get()->refreshFromStorageIfNeeded(opCtx());
+
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session());
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
+
opCtx()->setLogicalSessionId(sessionId);
_opObserver.emplace();
_times.emplace(opCtx());
@@ -556,17 +561,13 @@ protected:
ASSERT(txnRecord.getState() == txnState);
ASSERT_EQ(txnState != boost::none,
txnRecordObj.hasField(SessionTxnRecord::kStateFieldName));
+
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session());
if (!opTime.isNull()) {
ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
- ASSERT_EQ(opTime, session()->getLastWriteOpTime(txnNum));
+ ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
} else {
- ASSERT_EQ(txnRecord.getLastWriteOpTime(), session()->getLastWriteOpTime(txnNum));
- }
-
- session()->invalidate();
- session()->refreshFromStorageIfNeeded(opCtx());
- if (!opTime.isNull()) {
- ASSERT_EQ(opTime, session()->getLastWriteOpTime(txnNum));
+ ASSERT_EQ(txnRecord.getLastWriteOpTime(), txnParticipant->getLastWriteOpTime(txnNum));
}
}
@@ -715,8 +716,6 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedCommitTest) {
const auto prepareSlot = repl::getNextOpTime(opCtx());
prepareTimestamp = prepareSlot.opTime.getTimestamp();
opObserver().onTransactionPrepare(opCtx(), prepareSlot);
- session()->lockTxnNumber(txnNum,
- Status(ErrorCodes::OperationFailed, "unittest lock failed"));
commitSlot = repl::getNextOpTime(opCtx());
}
@@ -785,9 +784,6 @@ TEST_F(OpObserverTransactionTest, TransactionalPreparedAbortTest) {
txnParticipant->transitionToPreparedforTest();
const auto prepareSlot = repl::getNextOpTime(opCtx());
opObserver().onTransactionPrepare(opCtx(), prepareSlot);
- session()->lockTxnNumber(txnNum,
- Status(ErrorCodes::OperationFailed, "unittest lock failed"));
-
abortSlot = repl::getNextOpTime(opCtx());
}
@@ -944,10 +940,6 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction
opObserver().onTransactionPrepare(opCtx(), slot);
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
txnParticipant->transitionToPreparedforTest();
- session()->lockTxnNumber(txnNum,
- {ErrorCodes::PreparedTransactionInProgress,
- "unittest mock prepare transaction number lock"});
-
abortSlot = repl::getNextOpTime(opCtx());
}
@@ -957,7 +949,6 @@ TEST_F(OpObserverTransactionTest, AbortingPreparedTransactionWritesToTransaction
opObserver().onTransactionAbort(opCtx(), abortSlot);
txnParticipant->transitionToAbortedforTest();
- session()->unlockTxnNumber();
txnParticipant->stashTransactionResources(opCtx());
// Abort the storage-transaction without calling the OpObserver.
@@ -1027,9 +1018,6 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
opObserver().onTransactionPrepare(opCtx(), slot);
opCtx()->recoveryUnit()->setPrepareTimestamp(slot.opTime.getTimestamp());
txnParticipant->transitionToPreparedforTest();
- session()->lockTxnNumber(txnNum,
- {ErrorCodes::PreparedTransactionInProgress,
- "unittest mock prepare transaction number lock"});
}
OplogSlot commitSlot = repl::getNextOpTime(opCtx());
@@ -1041,8 +1029,6 @@ TEST_F(OpObserverTransactionTest, CommittingPreparedTransactionWritesToTransacti
opCtx()->lockState()->unsetMaxLockTimeout();
opObserver().onTransactionCommit(opCtx(), commitSlot, prepareOpTime.getTimestamp());
- session()->unlockTxnNumber();
-
assertTxnRecord(txnNum, commitOpTime, DurableTxnStateEnum::kCommitted);
}
diff --git a/src/mongo/db/operation_context_session_mongod.cpp b/src/mongo/db/operation_context_session_mongod.cpp
index 0e79480b31c..bcb7a889cd3 100644
--- a/src/mongo/db/operation_context_session_mongod.cpp
+++ b/src/mongo/db/operation_context_session_mongod.cpp
@@ -26,6 +26,8 @@
* it in the license file.
*/
+#include "mongo/platform/basic.h"
+
#include "mongo/db/operation_context_session_mongod.h"
#include "mongo/db/transaction_coordinator_factory.h"
@@ -40,12 +42,10 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o
boost::optional<bool> coordinator)
: _operationContextSession(opCtx, shouldCheckOutSession) {
if (shouldCheckOutSession && !opCtx->getClient()->isInDirectClient()) {
- auto session = OperationContextSession::get(opCtx);
- invariant(session);
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx);
- auto clientTxnNumber = *opCtx->getTxnNumber();
- session->refreshFromStorageIfNeeded(opCtx);
- session->beginOrContinueTxn(opCtx, clientTxnNumber);
+ const auto clientTxnNumber = *opCtx->getTxnNumber();
if (startTransaction && *startTransaction) {
// If this shard has been selected as the coordinator, set up the coordinator state
@@ -55,7 +55,6 @@ OperationContextSessionMongod::OperationContextSessionMongod(OperationContext* o
}
}
- auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant->beginOrContinue(clientTxnNumber, autocommit, startTransaction);
}
}
@@ -64,16 +63,9 @@ OperationContextSessionMongodWithoutRefresh::OperationContextSessionMongodWithou
OperationContext* opCtx)
: _operationContextSession(opCtx, true /* checkout */) {
invariant(!opCtx->getClient()->isInDirectClient());
- auto session = OperationContextSession::get(opCtx);
- invariant(session);
-
- auto clientTxnNumber = *opCtx->getTxnNumber();
- // Session is refreshed, but the transaction participant isn't.
- session->refreshFromStorageIfNeeded(opCtx);
- session->beginOrContinueTxn(opCtx, clientTxnNumber);
+ const auto clientTxnNumber = *opCtx->getTxnNumber();
- auto txnParticipant = TransactionParticipant::get(opCtx);
- invariant(txnParticipant);
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
txnParticipant->beginOrContinueTransactionUnconditionally(clientTxnNumber);
}
diff --git a/src/mongo/db/operation_context_session_mongod.h b/src/mongo/db/operation_context_session_mongod.h
index 9c0a13e4e0e..f455a7d2ad0 100644
--- a/src/mongo/db/operation_context_session_mongod.h
+++ b/src/mongo/db/operation_context_session_mongod.h
@@ -54,11 +54,11 @@ private:
};
/**
- * Similar to OperationContextSessionMongod, but this starts a new transaction unconditionally
- * without refreshing the state from disk. The session reloads the state from disk but
- * the transaction participant will not use the on-disk state to refresh its in-memory state.
+ * Similar to OperationContextSessionMongod, but marks the TransactionParticipant as valid without
+ * refreshing from disk and starts a new transaction unconditionally.
*
- * This is used for transaction secondary application and recovery.
+ * NOTE: Only used by the replication oplog application logic on secondaries in order to replay
+ * prepared transactions.
*/
class OperationContextSessionMongodWithoutRefresh {
public:
diff --git a/src/mongo/db/ops/write_ops_exec.cpp b/src/mongo/db/ops/write_ops_exec.cpp
index d9dc53085b4..5a592da38a9 100644
--- a/src/mongo/db/ops/write_ops_exec.cpp
+++ b/src/mongo/db/ops/write_ops_exec.cpp
@@ -511,11 +511,9 @@ WriteResult performInserts(OperationContext* opCtx,
} else {
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
- auto session = OperationContextSession::get(opCtx);
- invariant(session);
if (!txnParticipant->inMultiDocumentTransaction() &&
- session->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(),
- stmtId)) {
+ txnParticipant->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(),
+ stmtId)) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry());
@@ -678,10 +676,9 @@ WriteResult performUpdates(OperationContext* opCtx, const write_ops::Update& who
for (auto&& singleOp : wholeOp.getUpdates()) {
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
- auto session = OperationContextSession::get(opCtx);
if (!txnParticipant->inMultiDocumentTransaction()) {
- if (auto entry =
- session->checkStatementExecuted(opCtx, *opCtx->getTxnNumber(), stmtId)) {
+ if (auto entry = txnParticipant->checkStatementExecuted(
+ opCtx, *opCtx->getTxnNumber(), stmtId)) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
out.results.emplace_back(parseOplogEntryForUpdate(*entry));
@@ -821,9 +818,9 @@ WriteResult performDeletes(OperationContext* opCtx, const write_ops::Delete& who
for (auto&& singleOp : wholeOp.getDeletes()) {
const auto stmtId = getStmtIdForWriteOp(opCtx, wholeOp, stmtIdIndex++);
if (opCtx->getTxnNumber()) {
- auto session = OperationContextSession::get(opCtx);
if (!txnParticipant->inMultiDocumentTransaction() &&
- session->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(), stmtId)) {
+ txnParticipant->checkStatementExecutedNoOplogEntryFetch(*opCtx->getTxnNumber(),
+ stmtId)) {
containsRetry = true;
RetryableWritesStats::get(opCtx)->incrementRetriedStatementsCount();
out.results.emplace_back(makeWriteResultForInsertOrDeleteRetry());
diff --git a/src/mongo/db/repl/oplog.cpp b/src/mongo/db/repl/oplog.cpp
index cacb87ade6a..801a9dcd31b 100644
--- a/src/mongo/db/repl/oplog.cpp
+++ b/src/mongo/db/repl/oplog.cpp
@@ -538,7 +538,9 @@ std::vector<OpTime> logInsertOps(OperationContext* opCtx,
if (session) {
sessionInfo.setSessionId(*opCtx->getLogicalSessionId());
sessionInfo.setTxnNumber(*opCtx->getTxnNumber());
- oplogLink.prevOpTime = session->getLastWriteOpTime(*opCtx->getTxnNumber());
+
+ const auto txnParticipant = TransactionParticipant::get(opCtx);
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(*opCtx->getTxnNumber());
}
auto timestamps = stdx::make_unique<Timestamp[]>(count);
@@ -1200,7 +1202,6 @@ Status applyOperation_inlock(OperationContext* opCtx,
}
return true;
}();
-
invariant(!assignOperationTimestamp || !fieldTs.eoo(),
str::stream() << "Oplog entry did not have 'ts' field when expected: " << redact(op));
diff --git a/src/mongo/db/repl/session_update_tracker.cpp b/src/mongo/db/repl/session_update_tracker.cpp
index 4028d6980c0..0673d0dec2a 100644
--- a/src/mongo/db/repl/session_update_tracker.cpp
+++ b/src/mongo/db/repl/session_update_tracker.cpp
@@ -31,11 +31,77 @@
#include "mongo/db/repl/session_update_tracker.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/session.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/util/assert_util.h"
namespace mongo {
namespace repl {
+namespace {
+
+/**
+ * Constructs a new oplog entry if the given entry has transaction state embedded within in. The new
+ * oplog entry will contain the operation needed to replicate the transaction table.
+ *
+ * Returns boost::none if the given oplog doesn't have any transaction state or does not support
+ * update to the transaction table.
+ */
+boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate(
+ const repl::OplogEntry& entry) {
+ auto sessionInfo = entry.getOperationSessionInfo();
+ if (!sessionInfo.getTxnNumber()) {
+ return boost::none;
+ }
+
+ invariant(sessionInfo.getSessionId());
+ invariant(entry.getWallClockTime());
+
+ const auto updateBSON = [&] {
+ SessionTxnRecord newTxnRecord;
+ newTxnRecord.setSessionId(*sessionInfo.getSessionId());
+ newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
+ newTxnRecord.setLastWriteOpTime(entry.getOpTime());
+ newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
+
+ switch (entry.getCommandType()) {
+ case repl::OplogEntry::CommandType::kApplyOps:
+ newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared
+ : DurableTxnStateEnum::kCommitted);
+ break;
+ case repl::OplogEntry::CommandType::kCommitTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
+ break;
+ case repl::OplogEntry::CommandType::kAbortTransaction:
+ newTxnRecord.setState(DurableTxnStateEnum::kAborted);
+ break;
+ default:
+ break;
+ }
+ return newTxnRecord.toBSON();
+ }();
+
+ return repl::OplogEntry(
+ entry.getOpTime(),
+ 0, // hash
+ repl::OpTypeEnum::kUpdate,
+ NamespaceString::kSessionTransactionsTableNamespace,
+ boost::none, // uuid
+ false, // fromMigrate
+ repl::OplogEntry::kOplogVersion,
+ updateBSON,
+ BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
+ {}, // sessionInfo
+ true, // upsert
+ *entry.getWallClockTime(),
+ boost::none, // statementId
+ boost::none, // prevWriteOpTime
+ boost::none, // preImangeOpTime
+ boost::none // postImageOpTime
+ );
+}
+
+} // namespace
boost::optional<std::vector<OplogEntry>> SessionUpdateTracker::updateOrFlush(
const OplogEntry& entry) {
@@ -96,7 +162,7 @@ std::vector<OplogEntry> SessionUpdateTracker::flushAll() {
std::vector<OplogEntry> opList;
for (auto&& entry : _sessionsToUpdate) {
- auto newUpdate = Session::createMatchingTransactionTableUpdate(entry.second);
+ auto newUpdate = createMatchingTransactionTableUpdate(entry.second);
invariant(newUpdate);
opList.push_back(std::move(*newUpdate));
}
@@ -117,7 +183,7 @@ std::vector<OplogEntry> SessionUpdateTracker::_flushForQueryPredicate(
}
std::vector<OplogEntry> opList;
- auto updateOplog = Session::createMatchingTransactionTableUpdate(iter->second);
+ auto updateOplog = createMatchingTransactionTableUpdate(iter->second);
invariant(updateOplog);
opList.push_back(std::move(*updateOplog));
_sessionsToUpdate.erase(iter);
diff --git a/src/mongo/db/repl/sync_tail.cpp b/src/mongo/db/repl/sync_tail.cpp
index 5025b48ff23..9a2f06d1039 100644
--- a/src/mongo/db/repl/sync_tail.cpp
+++ b/src/mongo/db/repl/sync_tail.cpp
@@ -73,6 +73,7 @@
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/stats/timer_stats.h"
#include "mongo/db/storage/recovery_unit.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/exit.h"
#include "mongo/util/fail_point_service.h"
@@ -82,11 +83,7 @@
#include "mongo/util/scopeguard.h"
namespace mongo {
-
-using std::endl;
-
namespace repl {
-
namespace {
MONGO_FAIL_POINT_DEFINE(pauseBatchApplicationBeforeCompletion);
@@ -1005,12 +1002,12 @@ BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const OplogEntry& oplog
bool ok = missingObjReader.connect(*source);
if (!ok) {
warning() << "network problem detected while connecting to the "
- << "sync source, attempt " << retryCount << " of " << retryMax << endl;
+ << "sync source, attempt " << retryCount << " of " << retryMax;
continue; // try again
}
} catch (const NetworkException&) {
warning() << "network problem detected while connecting to the "
- << "sync source, attempt " << retryCount << " of " << retryMax << endl;
+ << "sync source, attempt " << retryCount << " of " << retryMax;
continue; // try again
}
@@ -1037,10 +1034,10 @@ BSONObj SyncTail::getMissingDoc(OperationContext* opCtx, const OplogEntry& oplog
}
} catch (const NetworkException&) {
warning() << "network problem detected while fetching a missing document from the "
- << "sync source, attempt " << retryCount << " of " << retryMax << endl;
+ << "sync source, attempt " << retryCount << " of " << retryMax;
continue; // try again
} catch (DBException& e) {
- error() << "assertion fetching missing object: " << redact(e) << endl;
+ error() << "assertion fetching missing object: " << redact(e);
throw;
}
diff --git a/src/mongo/db/repl/sync_tail_test.cpp b/src/mongo/db/repl/sync_tail_test.cpp
index 32cb9afa7f2..91944c0ebe7 100644
--- a/src/mongo/db/repl/sync_tail_test.cpp
+++ b/src/mongo/db/repl/sync_tail_test.cpp
@@ -63,6 +63,7 @@
#include "mongo/db/service_context.h"
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/stdx/mutex.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index ede0ae0f8b5..9d7307eca00 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -247,20 +247,20 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
invariant(oplogEntry.getWallClockTime());
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
- scopedSession->refreshFromStorageIfNeeded(opCtx);
- if (!scopedSession->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) {
+ auto const txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get());
+ txnParticipant->refreshFromStorageIfNeeded(opCtx);
+
+ if (!txnParticipant->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) {
// Don't continue migrating the transaction history
return lastResult;
}
- auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get());
- txnParticipant->checkForNewTxnNumber();
-
BSONObj object(result.isPrePostImage
? oplogEntry.getObject()
: BSON(SessionCatalogMigrationDestination::kSessionMigrateOplogTag << 1));
auto oplogLink = extractPrePostImageTs(lastResult, oplogEntry);
- oplogLink.prevOpTime = scopedSession->getLastWriteOpTime(result.txnNum);
+ oplogLink.prevOpTime = txnParticipant->getLastWriteOpTime(result.txnNum);
writeConflictRetry(
opCtx,
@@ -301,7 +301,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
// Do not call onWriteOpCompletedOnPrimary if we inserted a pre/post image, because the
// next oplog will contain the real operation
if (!result.isPrePostImage) {
- scopedSession->onMigrateCompletedOnPrimary(
+ txnParticipant->onMigrateCompletedOnPrimary(
opCtx, result.txnNum, {stmtId}, oplogOpTime, *oplogEntry.getWallClockTime());
}
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 7ca1a4a6294..576290f1ca4 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -46,6 +46,7 @@
#include "mongo/db/session_catalog.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/s/catalog/sharding_catalog_client_mock.h"
#include "mongo/s/catalog/type_shard.h"
@@ -172,7 +173,9 @@ public:
const LogicalSessionId& sessionId,
const TxnNumber& txnNum) {
auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId);
- scopedSession->beginOrContinueTxn(opCtx, txnNum);
+ const auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get());
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
return scopedSession;
}
@@ -201,7 +204,8 @@ public:
Session* session,
TxnNumber txnNumber,
StmtId stmtId) {
- auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId);
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session);
+ auto oplog = txnParticipant->checkStatementExecuted(opCtx, txnNumber, stmtId);
ASSERT_TRUE(oplog);
}
@@ -210,7 +214,8 @@ public:
TxnNumber txnNumber,
StmtId stmtId,
repl::OplogEntry& expectedOplog) {
- auto oplog = session->checkStatementExecuted(opCtx, txnNumber, stmtId);
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session);
+ auto oplog = txnParticipant->checkStatementExecuted(opCtx, txnNumber, stmtId);
ASSERT_TRUE(oplog);
checkOplogWithNestedOplog(expectedOplog, *oplog);
}
@@ -355,7 +360,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxn) {
finishSessionExpectSuccess(&sessionMigration);
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -416,7 +422,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldOnlyStoreHistoryOfLatestTxn
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, txnNum);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(txnNum));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(txnNum));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -467,7 +474,8 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithSameTxnInSeparate
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -534,7 +542,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
{
auto session = getSessionWithTxn(opCtx, sessionId1, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+ const auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session.get());
+
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog1, historyIter.next(opCtx));
ASSERT_FALSE(historyIter.hasNext());
@@ -544,8 +555,10 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithDifferentSession)
{
auto session = getSessionWithTxn(opCtx, sessionId2, 42);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(42));
+ const auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(42));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -607,7 +620,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldNotNestAlreadyNestedOplog)
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplog(oplog2, historyIter.next(opCtx));
@@ -656,8 +671,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePreImageFindA
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
auto nextOplog = historyIter.next(opCtx);
@@ -744,8 +760,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandlePostImageFind
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
auto nextOplog = historyIter.next(opCtx);
@@ -835,8 +852,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldBeAbleToHandleFindAndModify
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
auto nextOplog = historyIter.next(opCtx);
@@ -934,8 +952,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OlderTxnShouldBeIgnored) {
ASSERT_TRUE(SessionCatalogMigrationDestination::State::Done == sessionMigration.getState());
auto session = getSessionWithTxn(opCtx, sessionId, 20);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(20));
ASSERT_TRUE(historyIter.hasNext());
auto oplog = historyIter.next(opCtx);
ASSERT_BSONOBJ_EQ(BSON("_id"
@@ -995,8 +1014,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, NewerTxnWriteShouldNotBeOverwritt
finishSessionExpectSuccess(&sessionMigration);
auto session = getSessionWithTxn(opCtx, sessionId, 20);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(20));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(20));
ASSERT_TRUE(historyIter.hasNext());
auto oplog = historyIter.next(opCtx);
ASSERT_BSONOBJ_EQ(BSON("_id"
@@ -1174,8 +1194,9 @@ TEST_F(SessionCatalogMigrationDestinationTest,
finishSessionExpectSuccess(&sessionMigration);
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog2, historyIter.next(opCtx));
@@ -1477,8 +1498,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, ShouldIgnoreAlreadyExecutedStatem
finishSessionExpectSuccess(&sessionMigration);
auto session = getSessionWithTxn(opCtx, sessionId, 19);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(19));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(19));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -1517,13 +1539,13 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
Date_t::now(), // wall clock time
23); // statement id
- auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
- OpTypeEnum::kNoop, // op type
- {}, // o
- Session::kDeadEndSentinel, // o2
- sessionInfo, // session info
- Date_t::now(), // wall clock time
- kIncompleteHistoryStmtId); // statement id
+ auto oplog2 = makeOplogEntry(OpTime(Timestamp(80, 2), 1), // optime
+ OpTypeEnum::kNoop, // op type
+ {}, // o
+ TransactionParticipant::kDeadEndSentinel, // o2
+ sessionInfo, // session info
+ Date_t::now(), // wall clock time
+ kIncompleteHistoryStmtId); // statement id
auto oplog3 = makeOplogEntry(OpTime(Timestamp(60, 2), 1), // optime
OpTypeEnum::kInsert, // op type
@@ -1544,8 +1566,9 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
auto opCtx = operationContext();
auto session = getSessionWithTxn(opCtx, sessionId, 2);
- TransactionHistoryIterator historyIter(session->getLastWriteOpTime(2));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ TransactionHistoryIterator historyIter(txnParticipant->getLastWriteOpTime(2));
ASSERT_TRUE(historyIter.hasNext());
checkOplogWithNestedOplog(oplog3, historyIter.next(opCtx));
@@ -1559,7 +1582,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, OplogEntriesWithIncompleteHistory
checkStatementExecuted(opCtx, session.get(), 2, 23, oplog1);
checkStatementExecuted(opCtx, session.get(), 2, 5, oplog3);
- ASSERT_THROWS(session->checkStatementExecuted(opCtx, 2, 38), AssertionException);
+ ASSERT_THROWS(txnParticipant->checkStatementExecuted(opCtx, 2, 38), AssertionException);
}
} // namespace
diff --git a/src/mongo/db/s/session_catalog_migration_source.cpp b/src/mongo/db/s/session_catalog_migration_source.cpp
index 6e1ffc21134..9835843a9aa 100644
--- a/src/mongo/db/s/session_catalog_migration_source.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/transaction_history_iterator.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/db/write_concern.h"
#include "mongo/platform/random.h"
#include "mongo/stdx/memory.h"
@@ -106,14 +107,14 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
* Creates a special "write history lost" sentinel oplog entry.
*/
repl::OplogEntry makeSentinelOplogEntry(OperationSessionInfo sessionInfo, Date_t wallClockTime) {
- return makeOplogEntry({}, // optime
- hashGenerator.nextInt64(), // hash
- repl::OpTypeEnum::kNoop, // op type
- {}, // o
- Session::kDeadEndSentinel, // o2
- sessionInfo, // session info
- wallClockTime, // wall clock time
- kIncompleteHistoryStmtId); // statement id
+ return makeOplogEntry({}, // optime
+ hashGenerator.nextInt64(), // hash
+ repl::OpTypeEnum::kNoop, // op type
+ {}, // o
+ TransactionParticipant::kDeadEndSentinel, // o2
+ sessionInfo, // session info
+ wallClockTime, // wall clock time
+ kIncompleteHistoryStmtId); // statement id
}
} // namespace
diff --git a/src/mongo/db/s/session_catalog_migration_source_test.cpp b/src/mongo/db/s/session_catalog_migration_source_test.cpp
index dc2ae5677de..b8a99afb851 100644
--- a/src/mongo/db/s/session_catalog_migration_source_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_source_test.cpp
@@ -35,6 +35,7 @@
#include "mongo/db/s/session_catalog_migration_source.h"
#include "mongo/db/session.h"
#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/executor/remote_command_request.h"
#include "mongo/unittest/unittest.h"
@@ -593,7 +594,7 @@ TEST_F(SessionCatalogMigrationSourceTest, ReturnsDeadEndSentinelForIncompleteHis
auto oplog = *nextOplogResult.oplog;
ASSERT_TRUE(oplog.getObject2());
- ASSERT_BSONOBJ_EQ(Session::kDeadEndSentinel, *oplog.getObject2());
+ ASSERT_BSONOBJ_EQ(TransactionParticipant::kDeadEndSentinel, *oplog.getObject2());
ASSERT_TRUE(oplog.getStatementId());
ASSERT_EQ(kIncompleteHistoryStmtId, *oplog.getStatementId());
ASSERT_TRUE(oplog.getWallClockTime());
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 8182d161303..085de43e94b 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -26,683 +26,14 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
-#define LOG_FOR_TRANSACTION(level) \
- MONGO_LOG_COMPONENT(level, ::mongo::logger::LogComponent::kTransaction)
#include "mongo/platform/basic.h"
#include "mongo/db/session.h"
-#include "mongo/db/catalog/index_catalog.h"
-#include "mongo/db/concurrency/lock_state.h"
-#include "mongo/db/concurrency/locker.h"
-#include "mongo/db/concurrency/write_conflict_exception.h"
-#include "mongo/db/db_raii.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/index/index_access_method.h"
-#include "mongo/db/namespace_string.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/ops/update.h"
-#include "mongo/db/query/get_executor.h"
-#include "mongo/db/retryable_writes_stats.h"
-#include "mongo/db/transaction_history_iterator.h"
-#include "mongo/stdx/memory.h"
-#include "mongo/transport/transport_layer.h"
-#include "mongo/util/fail_point_service.h"
-#include "mongo/util/log.h"
-#include "mongo/util/mongoutils/str.h"
-
namespace mongo {
-namespace {
-
-void fassertOnRepeatedExecution(const LogicalSessionId& lsid,
- TxnNumber txnNumber,
- StmtId stmtId,
- const repl::OpTime& firstOpTime,
- const repl::OpTime& secondOpTime) {
- severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":"
- << txnNumber << " ] was committed once with opTime " << firstOpTime
- << " and a second time with opTime " << secondOpTime
- << ". This indicates possible data corruption or server bug and the process will be "
- "terminated.";
- fassertFailed(40526);
-}
-
-struct ActiveTransactionHistory {
- boost::optional<SessionTxnRecord> lastTxnRecord;
- Session::CommittedStatementTimestampMap committedStatements;
- bool transactionCommitted{false};
- bool hasIncompleteHistory{false};
-};
-
-ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
- const LogicalSessionId& lsid) {
- ActiveTransactionHistory result;
-
- result.lastTxnRecord = [&]() -> boost::optional<SessionTxnRecord> {
- DBDirectClient client(opCtx);
- auto result =
- client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
- {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())});
- if (result.isEmpty()) {
- return boost::none;
- }
-
- return SessionTxnRecord::parse(IDLParserErrorContext("parse latest txn record for session"),
- result);
- }();
-
- if (!result.lastTxnRecord) {
- return result;
- }
-
- auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime());
- while (it.hasNext()) {
- try {
- const auto entry = it.next(opCtx);
- invariant(entry.getStatementId());
-
- if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
- // Only the dead end sentinel can have this id for oplog write history
- invariant(entry.getObject2());
- invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0);
- result.hasIncompleteHistory = true;
- continue;
- }
-
- const auto insertRes =
- result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
- if (!insertRes.second) {
- const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(lsid,
- result.lastTxnRecord->getTxnNum(),
- *entry.getStatementId(),
- existingOpTime,
- entry.getOpTime());
- }
-
- // applyOps oplog entry marks the commit of a transaction.
- if (entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) {
- result.transactionCommitted = true;
- }
- } catch (const DBException& ex) {
- if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
- result.hasIncompleteHistory = true;
- break;
- }
-
- throw;
- }
- }
-
- return result;
-}
-
-void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) {
- // Current code only supports replacement update.
- dassert(UpdateDriver::isDocReplacement(updateRequest.getUpdates()));
-
- AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX);
-
- uassert(40527,
- str::stream() << "Unable to persist transaction state because the session transaction "
- "collection is missing. This indicates that the "
- << NamespaceString::kSessionTransactionsTableNamespace.ns()
- << " collection has been manually deleted.",
- autoColl.getCollection());
-
- WriteUnitOfWork wuow(opCtx);
-
- auto collection = autoColl.getCollection();
- auto idIndex = collection->getIndexCatalog()->findIdIndex(opCtx);
-
- uassert(40672,
- str::stream() << "Failed to fetch _id index for "
- << NamespaceString::kSessionTransactionsTableNamespace.ns(),
- idIndex);
-
- auto indexAccess = collection->getIndexCatalog()->getIndex(idIndex);
- // Since we are looking up a key inside the _id index, create a key object consisting of only
- // the _id field.
- auto idToFetch = updateRequest.getQuery().firstElement();
- auto toUpdateIdDoc = idToFetch.wrap();
- dassert(idToFetch.fieldNameStringData() == "_id"_sd);
- auto recordId = indexAccess->findSingle(opCtx, toUpdateIdDoc);
- auto startingSnapshotId = opCtx->recoveryUnit()->getSnapshotId();
-
- if (recordId.isNull()) {
- // Upsert case.
- auto status = collection->insertDocument(
- opCtx, InsertStatement(updateRequest.getUpdates()), nullptr, false);
-
- if (status == ErrorCodes::DuplicateKey) {
- throw WriteConflictException();
- }
-
- uassertStatusOK(status);
- wuow.commit();
- return;
- }
-
- auto originalRecordData = collection->getRecordStore()->dataFor(opCtx, recordId);
- auto originalDoc = originalRecordData.toBson();
-
- invariant(collection->getDefaultCollator() == nullptr);
- boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContext(opCtx, nullptr));
-
- auto matcher =
- fassert(40673, MatchExpressionParser::parse(updateRequest.getQuery(), std::move(expCtx)));
- if (!matcher->matchesBSON(originalDoc)) {
- // Document no longer match what we expect so throw WCE to make the caller re-examine.
- throw WriteConflictException();
- }
-
- CollectionUpdateArgs args;
- args.update = updateRequest.getUpdates();
- args.criteria = toUpdateIdDoc;
- args.fromMigrate = false;
-
- collection->updateDocument(opCtx,
- recordId,
- Snapshotted<BSONObj>(startingSnapshotId, originalDoc),
- updateRequest.getUpdates(),
- false, // indexesAffected = false because _id is the only index
- nullptr,
- &args);
-
- wuow.commit();
-}
-
-// Failpoint which allows different failure actions to happen after each write. Supports the
-// parameters below, which can be combined with each other (unless explicitly disallowed):
-//
-// closeConnection (bool, default = true): Closes the connection on which the write was executed.
-// failBeforeCommitExceptionCode (int, default = not specified): If set, the specified exception
-// code will be thrown, which will cause the write to not commit; if not specified, the write
-// will be allowed to commit.
-MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite);
-
-} // namespace
-
-const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
-
Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
-void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
- if (opCtx->getClient()->isInDirectClient()) {
- return;
- }
-
- invariant(!opCtx->lockState()->isLocked());
- invariant(repl::ReadConcernArgs::get(opCtx).getLevel() ==
- repl::ReadConcernLevel::kLocalReadConcern);
-
- stdx::unique_lock<stdx::mutex> ul(_mutex);
-
- while (!_isValid) {
- const int numInvalidations = _numInvalidations;
-
- ul.unlock();
-
- auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId);
-
- ul.lock();
-
- // Protect against concurrent refreshes or invalidations
- if (!_isValid && _numInvalidations == numInvalidations) {
- _isValid = true;
- _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord);
-
- if (_lastWrittenSessionRecord) {
- if (!_lastRefreshState) {
- _lastRefreshState.emplace();
- }
-
- _lastRefreshState->refreshCount++;
-
- _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum();
- _lastRefreshState->txnNumber = _activeTxnNumber;
-
- _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements);
- _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory;
- _lastRefreshState->isCommitted = activeTxnHistory.transactionCommitted;
- }
-
- break;
- }
- }
-}
-
-void Session::beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- _beginOrContinueTxn(lg, txnNumber);
-}
-
-void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
- TxnNumber txnNumber,
- std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime,
- Date_t lastStmtIdWriteDate,
- boost::optional<DurableTxnStateEnum> txnState) {
- invariant(opCtx->lockState()->inAWriteUnitOfWork());
-
- stdx::unique_lock<stdx::mutex> ul(_mutex);
-
- // Sanity check that we don't double-execute statements
- for (const auto stmtId : stmtIdsWritten) {
- const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId);
- if (stmtOpTime) {
- fassertOnRepeatedExecution(
- _sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
- }
- }
-
- const auto updateRequest =
- _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState);
-
- ul.unlock();
-
- repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
-
- updateSessionEntry(opCtx, updateRequest);
- _registerUpdateCacheOnCommit(
- opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
-}
-
-bool Session::onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) {
- beginOrContinueTxn(opCtx, txnNumber);
-
- try {
- if (checkStatementExecuted(opCtx, txnNumber, stmtId)) {
- return false;
- }
- } catch (const DBException& ex) {
- // If the transaction chain was truncated on the recipient shard, then we
- // are most likely copying from a session that hasn't been touched on the
- // recipient shard for a very long time but could be recent on the donor.
- // We continue copying regardless to get the entire transaction from the donor.
- if (ex.code() != ErrorCodes::IncompleteTransactionHistory) {
- throw;
- }
- if (stmtId == kIncompleteHistoryStmtId) {
- return false;
- }
- }
-
- return true;
-}
-
-void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx,
- TxnNumber txnNumber,
- std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime,
- Date_t oplogLastStmtIdWriteDate) {
- invariant(opCtx->lockState()->inAWriteUnitOfWork());
-
- stdx::unique_lock<stdx::mutex> ul(_mutex);
-
- _checkValid(ul);
- _checkIsActiveTransaction(ul, txnNumber);
-
- // We do not migrate transaction oplog entries.
- auto txnState = boost::none;
- const auto updateRequest = _makeUpdateRequest(
- ul, txnNumber, lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState);
-
- ul.unlock();
-
- repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
-
- updateSessionEntry(opCtx, updateRequest);
- _registerUpdateCacheOnCommit(
- opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
-}
-
-void Session::invalidate() {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
-
- if (_isTxnNumberLocked) {
- invariant(_txnNumberLockConflictStatus);
- uasserted(50908,
- str::stream() << "cannot invalidate session because txnNumber is locked: "
- << *_txnNumberLockConflictStatus);
- }
-
- _isValid = false;
- _numInvalidations++;
-
- _lastWrittenSessionRecord.reset();
-
- _activeTxnNumber = kUninitializedTxnNumber;
- _activeTxnCommittedStatements.clear();
- _hasIncompleteHistory = false;
-}
-
-repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- _checkValid(lg);
- _checkIsActiveTransaction(lg, txnNumber);
-
- if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
- return {};
-
- return _lastWrittenSessionRecord->getLastWriteOpTime();
-}
-
-boost::optional<repl::OplogEntry> Session::checkStatementExecuted(OperationContext* opCtx,
- TxnNumber txnNumber,
- StmtId stmtId) const {
- const auto stmtTimestamp = [&] {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- return _checkStatementExecuted(lg, txnNumber, stmtId);
- }();
-
- if (!stmtTimestamp)
- return boost::none;
-
- TransactionHistoryIterator txnIter(*stmtTimestamp);
- while (txnIter.hasNext()) {
- const auto entry = txnIter.next(opCtx);
- invariant(entry.getStatementId());
- if (*entry.getStatementId() == stmtId)
- return entry;
- }
-
- MONGO_UNREACHABLE;
-}
-
-bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- return bool(_checkStatementExecuted(lg, txnNumber, stmtId));
-}
-
-void Session::_beginOrContinueTxn(WithLock wl, TxnNumber txnNumber) {
-
- // Check whether the session information needs to be refreshed from disk.
- _checkValid(wl);
-
- // Check if the given transaction number is valid for this session. The transaction number must
- // be >= the active transaction number.
- _checkTxnValid(wl, txnNumber);
-
- //
- // Continue an active transaction.
- //
- if (txnNumber == _activeTxnNumber) {
- return;
- }
-
- invariant(txnNumber > _activeTxnNumber);
- _setActiveTxn(wl, txnNumber);
-
- LOG_FOR_TRANSACTION(4) << "New transaction started with txnNumber: " << txnNumber
- << " on session with lsid " << getSessionId().getId();
-}
-
-void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const {
- uassert(ErrorCodes::TransactionTooOld,
- str::stream() << "Cannot start transaction " << txnNumber << " on session "
- << getSessionId()
- << " because a newer transaction "
- << _activeTxnNumber
- << " has already started.",
- txnNumber >= _activeTxnNumber);
-}
-
-void Session::_setActiveTxn(WithLock wl, TxnNumber txnNumber) {
- if (_isTxnNumberLocked) {
- invariant(_txnNumberLockConflictStatus);
- uassertStatusOK(*_txnNumberLockConflictStatus);
- }
-
- _activeTxnNumber = txnNumber;
- _activeTxnCommittedStatements.clear();
- _hasIncompleteHistory = false;
-}
-
-void Session::_checkValid(WithLock) const {
- uassert(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Session " << getSessionId()
- << " was concurrently modified and the operation must be retried.",
- _isValid);
-}
-
-void Session::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const {
- uassert(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Cannot perform operations on transaction " << txnNumber
- << " on session "
- << getSessionId()
- << " because a different transaction "
- << _activeTxnNumber
- << " is now active.",
- txnNumber == _activeTxnNumber);
-}
-
-boost::optional<repl::OpTime> Session::_checkStatementExecuted(WithLock wl,
- TxnNumber txnNumber,
- StmtId stmtId) const {
- _checkValid(wl);
- _checkIsActiveTransaction(wl, txnNumber);
-
- const auto it = _activeTxnCommittedStatements.find(stmtId);
- if (it == _activeTxnCommittedStatements.end()) {
- uassert(ErrorCodes::IncompleteTransactionHistory,
- str::stream() << "Incomplete history detected for transaction " << txnNumber
- << " on session "
- << _sessionId.toBSON(),
- !_hasIncompleteHistory);
-
- return boost::none;
- }
-
- invariant(_lastWrittenSessionRecord);
- invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber);
-
- return it->second;
-}
-
-Date_t Session::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const {
- _checkValid(wl);
- _checkIsActiveTransaction(wl, txnNumber);
-
- if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
- return {};
-
- return _lastWrittenSessionRecord->getLastWriteDate();
-}
-
-UpdateRequest Session::_makeUpdateRequest(WithLock,
- TxnNumber newTxnNumber,
- const repl::OpTime& newLastWriteOpTime,
- Date_t newLastWriteDate,
- boost::optional<DurableTxnStateEnum> newState) const {
- UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
-
- const auto updateBSON = [&] {
- SessionTxnRecord newTxnRecord;
- newTxnRecord.setSessionId(_sessionId);
- newTxnRecord.setTxnNum(newTxnNumber);
- newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
- newTxnRecord.setLastWriteDate(newLastWriteDate);
- newTxnRecord.setState(newState);
- return newTxnRecord.toBSON();
- }();
- updateRequest.setUpdates(updateBSON);
- updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId.toBSON()));
- updateRequest.setUpsert(true);
-
- return updateRequest;
-}
-
-void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
- TxnNumber newTxnNumber,
- std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime) {
- opCtx->recoveryUnit()->onCommit(
- [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ](
- boost::optional<Timestamp>) {
- RetryableWritesStats::get(getGlobalServiceContext())
- ->incrementTransactionsCollectionWriteCount();
-
- stdx::lock_guard<stdx::mutex> lg(_mutex);
-
- if (!_isValid)
- return;
-
- // The cache of the last written record must always be advanced after a write so that
- // subsequent writes have the correct point to start from.
- if (!_lastWrittenSessionRecord) {
- _lastWrittenSessionRecord.emplace();
-
- _lastWrittenSessionRecord->setSessionId(_sessionId);
- _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
- _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
- } else {
- if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
- _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
-
- if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
- _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
- }
-
- if (newTxnNumber > _activeTxnNumber) {
- // This call is necessary in order to advance the txn number and reset the cached
- // state in the case where just before the storage transaction commits, the cache
- // entry gets invalidated and immediately refreshed while there were no writes for
- // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber
- // and we will fail to update the cache even though the write was successful.
- _beginOrContinueTxn(lg, newTxnNumber);
- }
-
- if (newTxnNumber == _activeTxnNumber) {
- for (const auto stmtId : stmtIdsWritten) {
- if (stmtId == kIncompleteHistoryStmtId) {
- _hasIncompleteHistory = true;
- continue;
- }
-
- const auto insertRes =
- _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
- if (!insertRes.second) {
- const auto& existingOpTime = insertRes.first->second;
- fassertOnRepeatedExecution(_sessionId,
- newTxnNumber,
- stmtId,
- existingOpTime,
- lastStmtIdWriteOpTime);
- }
- }
- }
- });
-
- MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) {
- const auto& data = customArgs.getData();
-
- const auto closeConnectionElem = data["closeConnection"];
- if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) {
- opCtx->getClient()->session()->end();
- }
-
- const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"];
- if (!failBeforeCommitExceptionElem.eoo()) {
- const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number()));
- uasserted(failureCode,
- str::stream() << "Failing write for " << _sessionId << ":" << newTxnNumber
- << " due to failpoint. The write must not be reflected.");
- }
- }
-}
-
-boost::optional<repl::OplogEntry> Session::createMatchingTransactionTableUpdate(
- const repl::OplogEntry& entry) {
- auto sessionInfo = entry.getOperationSessionInfo();
- if (!sessionInfo.getTxnNumber()) {
- return boost::none;
- }
-
- invariant(sessionInfo.getSessionId());
- invariant(entry.getWallClockTime());
-
- const auto updateBSON = [&] {
- SessionTxnRecord newTxnRecord;
- newTxnRecord.setSessionId(*sessionInfo.getSessionId());
- newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
- newTxnRecord.setLastWriteOpTime(entry.getOpTime());
- newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
-
- switch (entry.getCommandType()) {
- case repl::OplogEntry::CommandType::kApplyOps:
- newTxnRecord.setState(entry.shouldPrepare() ? DurableTxnStateEnum::kPrepared
- : DurableTxnStateEnum::kCommitted);
- break;
- case repl::OplogEntry::CommandType::kCommitTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kCommitted);
- break;
- case repl::OplogEntry::CommandType::kAbortTransaction:
- newTxnRecord.setState(DurableTxnStateEnum::kAborted);
- break;
- default:
- break;
- }
- return newTxnRecord.toBSON();
- }();
-
- return repl::OplogEntry(
- entry.getOpTime(),
- 0, // hash
- repl::OpTypeEnum::kUpdate,
- NamespaceString::kSessionTransactionsTableNamespace,
- boost::none, // uuid
- false, // fromMigrate
- repl::OplogEntry::kOplogVersion,
- updateBSON,
- BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
- {}, // sessionInfo
- true, // upsert
- *entry.getWallClockTime(),
- boost::none, // statementId
- boost::none, // prevWriteOpTime
- boost::none, // preImangeOpTime
- boost::none // postImageOpTime
- );
-}
-
-boost::optional<Session::RefreshState> Session::getLastRefreshState() const {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- return _lastRefreshState;
-}
-
-void Session::lockTxnNumber(const TxnNumber lockThisNumber, Status conflictError) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- uassert(50907,
- str::stream() << "cannot lock txnNumber to " << lockThisNumber
- << " because current txnNumber is "
- << _activeTxnNumber,
- _activeTxnNumber == lockThisNumber);
- // TODO: remove this if we need to support recursive locking.
- invariant(!_isTxnNumberLocked);
-
- _isTxnNumberLocked = true;
- _txnNumberLockConflictStatus = conflictError;
-}
-
-void Session::unlockTxnNumber() {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
-
- _isTxnNumberLocked = false;
- _txnNumberLockConflictStatus = boost::none;
-}
-
-bool Session::isLockedTxnNumber(const TxnNumber expectedLockedNumber) const {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- invariant(_activeTxnNumber == expectedLockedNumber,
- str::stream() << "Expected TxnNumber: " << expectedLockedNumber
- << ", Active TxnNumber: "
- << _activeTxnNumber);
- return _isTxnNumberLocked;
-}
-
void Session::setCurrentOperation(OperationContext* currentOperation) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(!_currentOperation);
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 2f79b3784f8..7b0c1a86857 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -28,196 +28,32 @@
#pragma once
-#include <boost/optional.hpp>
-
#include "mongo/base/disallow_copying.h"
-#include "mongo/bson/timestamp.h"
#include "mongo/db/logical_session_id.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/repl/oplog.h"
-#include "mongo/db/repl/oplog_entry.h"
-#include "mongo/db/session_txn_record_gen.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/unordered_map.h"
-#include "mongo/util/concurrency/with_lock.h"
+#include "mongo/util/decorable.h"
namespace mongo {
class OperationContext;
-class UpdateRequest;
/**
- * A write through cache for the state of a particular session. All modifications to the underlying
- * session transactions collection must be performed through an object of this class.
- *
- * The cache state can be 'up-to-date' (it is in sync with the persistent contents) or 'needs
- * refresh' (in which case refreshFromStorageIfNeeded needs to be called in order to make it
- * up-to-date).
+ * A decorable container for state associated with an active session running on a MongoD or MongoS
+ * server. Refer to SessionCatalog for more information on the semantics of sessions.
*/
class Session : public Decorable<Session> {
MONGO_DISALLOW_COPYING(Session);
public:
- using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>;
-
- static const BSONObj kDeadEndSentinel;
-
explicit Session(LogicalSessionId sessionId);
- const LogicalSessionId& getSessionId() const {
- return _sessionId;
- }
-
- struct RefreshState {
- long long refreshCount{0};
- TxnNumber txnNumber{kUninitializedTxnNumber};
- bool isCommitted{false};
- };
-
- /**
- * Blocking method, which loads the transaction state from storage if it has been marked as
- * needing refresh.
- *
- * In order to avoid the possibility of deadlock, this method must not be called while holding a
- * lock.
- */
- void refreshFromStorageIfNeeded(OperationContext* opCtx);
-
- /**
- * Starts a new transaction on the session, or continues an already active transaction. In this
- * context, a "transaction" is a sequence of operations associated with a transaction number.
- *
- * Throws an exception if:
- * - An attempt is made to start a transaction with number less than the latest
- * transaction this session has seen.
- * - The session has been invalidated.
- *
- * In order to avoid the possibility of deadlock, this method must not be called while holding a
- * lock. This method must also be called after refreshFromStorageIfNeeded has been called.
- */
- void beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber);
-
- /**
- * Called after a write under the specified transaction completes while the node is a primary
- * and specifies the statement ids which were written. Must be called while the caller is still
- * in the write's WUOW. Updates the on-disk state of the session to match the specified
- * transaction/opTime and keeps the cached state in sync.
- *
- * 'txnState' is 'none' for retryable writes.
- *
- * Must only be called with the session checked-out.
- *
- * Throws if the session has been invalidated or the active transaction number doesn't match.
- */
- void onWriteOpCompletedOnPrimary(OperationContext* opCtx,
- TxnNumber txnNumber,
- std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime,
- Date_t lastStmtIdWriteDate,
- boost::optional<DurableTxnStateEnum> txnState);
-
/**
- * Helper function to begin a migration on a primary node.
- *
- * Returns whether the specified statement should be migrated at all or skipped.
- *
- * Not called with session checked out.
+ * The logical session id that this object represents.
*/
- bool onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId);
-
- /**
- * Called after an entry for the specified session and transaction has been written to the oplog
- * during chunk migration, while the node is still primary. Must be called while the caller is
- * still in the oplog write's WUOW. Updates the on-disk state of the session to match the
- * specified transaction/opTime and keeps the cached state in sync.
- *
- * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary
- * and doesn't require the session to be checked-out.
- *
- * Throws if the session has been invalidated or the active transaction number is newer than the
- * one specified.
- */
- void onMigrateCompletedOnPrimary(OperationContext* opCtx,
- TxnNumber txnNumber,
- std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteOpTime,
- Date_t oplogLastStmtIdWriteDate);
-
- /**
- * Marks the session as requiring refresh. Used when the session state has been modified
- * externally, such as through a direct write to the transactions table.
- */
- void invalidate();
-
- /**
- * Returns the op time of the last committed write for this session and transaction. If no write
- * has completed yet, returns an empty timestamp.
- *
- * Throws if the session has been invalidated or the active transaction number doesn't match.
- */
- repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const;
-
- /**
- * Checks whether the given statementId for the specified transaction has already executed and
- * if so, returns the oplog entry which was generated by that write. If the statementId hasn't
- * executed, returns boost::none.
- *
- * Must only be called with the session checked-out.
- *
- * Throws if the session has been invalidated or the active transaction number doesn't match.
- */
- boost::optional<repl::OplogEntry> checkStatementExecuted(OperationContext* opCtx,
- TxnNumber txnNumber,
- StmtId stmtId) const;
-
- /**
- * Checks whether the given statementId for the specified transaction has already executed
- * without fetching the oplog entry which was generated by that write.
- *
- * Must only be called with the session checked-out.
- *
- * Throws if the session has been invalidated or the active transaction number doesn't match.
- */
- bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const;
-
- TxnNumber getActiveTxnNumber() const {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- return _activeTxnNumber;
+ const LogicalSessionId& getSessionId() const {
+ return _sessionId;
}
/**
- * Returns a new oplog entry if the given entry has transaction state embedded within in.
- * The new oplog entry will contain the operation needed to replicate the transaction
- * table.
- * Returns boost::none if the given oplog doesn't have any transaction state or does not
- * support update to the transaction table.
- */
- static boost::optional<repl::OplogEntry> createMatchingTransactionTableUpdate(
- const repl::OplogEntry& entry);
-
- /**
- * Returns the state of the session from storage the last time a refresh occurred.
- */
- boost::optional<RefreshState> getLastRefreshState() const;
-
- /**
- * Attempt to lock the active TxnNumber of this session to the given number. This operation
- * can only succeed if it is equal to the current active TxnNumber. Also sets the error status
- * for any callers trying to modify the TxnNumber.
- */
- void lockTxnNumber(const TxnNumber lockThisNumber, Status conflictError);
-
- /**
- * Release the lock on the active TxnNumber and allow it to be modified.
- */
- void unlockTxnNumber();
-
- /**
- * Returns if the given TxnNumber is locked.
- */
- bool isLockedTxnNumber(const TxnNumber expectedLockedNumber) const;
-
- /**
* Sets the current operation running on this Session.
*/
void setCurrentOperation(OperationContext* currentOperation);
@@ -234,40 +70,7 @@ public:
OperationContext* getCurrentOperation() const;
private:
- void _beginOrContinueTxn(WithLock, TxnNumber txnNumber);
-
- // Checks if there is a conflicting operation on the current Session
- void _checkValid(WithLock) const;
-
- // Checks that a new txnNumber is higher than the activeTxnNumber so
- // we don't start a txn that is too old.
- void _checkTxnValid(WithLock, TxnNumber txnNumber) const;
-
- void _setActiveTxn(WithLock, TxnNumber txnNumber);
-
- void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const;
-
- boost::optional<repl::OpTime> _checkStatementExecuted(WithLock,
- TxnNumber txnNumber,
- StmtId stmtId) const;
-
- // Returns the write date of the last committed write for this session and transaction. If no
- // write has completed yet, returns an empty date.
- //
- // Throws if the session has been invalidated or the active transaction number doesn't match.
- Date_t _getLastWriteDate(WithLock, TxnNumber txnNumber) const;
-
- UpdateRequest _makeUpdateRequest(WithLock,
- TxnNumber newTxnNumber,
- const repl::OpTime& newLastWriteTs,
- Date_t newLastWriteDate,
- boost::optional<DurableTxnStateEnum> newState) const;
-
- void _registerUpdateCacheOnCommit(OperationContext* opCtx,
- TxnNumber newTxnNumber,
- std::vector<StmtId> stmtIdsWritten,
- const repl::OpTime& lastStmtIdWriteTs);
-
+ // The id of the session with which this object is associated
const LogicalSessionId _sessionId;
// Protects the member variables below.
@@ -276,40 +79,6 @@ private:
// A pointer back to the currently running operation on this Session, or nullptr if there
// is no operation currently running for the Session.
OperationContext* _currentOperation{nullptr};
-
- // Specifies whether the session information needs to be refreshed from storage
- bool _isValid{false};
-
- // Counter, incremented with each call to invalidate in order to discern invalidations, which
- // happen during refresh
- int _numInvalidations{0};
-
- // Set to true if incomplete history is detected. For example, when the oplog to a write was
- // truncated because it was too old.
- bool _hasIncompleteHistory{false};
-
- // Caches what is known to be the last written transaction record for the session
- boost::optional<SessionTxnRecord> _lastWrittenSessionRecord;
-
- // Tracks the last seen txn number for the session and is always >= to the transaction number in
- // the last written txn record. When it is > than that in the last written txn record, this
- // means a new transaction has begun on the session, but it hasn't yet performed any writes.
- TxnNumber _activeTxnNumber{kUninitializedTxnNumber};
-
- // For the active txn, tracks which statement ids have been committed and at which oplog
- // opTime. Used for fast retryability check and retrieving the previous write's data without
- // having to scan through the oplog.
- CommittedStatementTimestampMap _activeTxnCommittedStatements;
-
- // Stores the state from last refresh.
- boost::optional<RefreshState> _lastRefreshState;
-
- // True if txnNumber cannot be modified.
- bool _isTxnNumberLocked{false};
-
- // The status to return when an operation tries to modify the active TxnNumber while it is
- // locked.
- boost::optional<Status> _txnNumberLockConflictStatus;
};
} // namespace mongo
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index d4d4634f36e..5d18e890666 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -173,7 +173,9 @@ void SessionCatalog::invalidateSessions(OperationContext* opCtx,
const auto invalidateSessionFn = [&](WithLock, SessionRuntimeInfoMap::iterator it) {
auto& sri = it->second;
- sri->txnState.invalidate();
+ auto const txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(&sri->txnState);
+ txnParticipant->invalidate();
// We cannot remove checked-out sessions from the cache, because operations expect to find
// them there to check back in
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 97bad1e279e..365202ba309 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -35,6 +35,7 @@
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
+#include "mongo/util/concurrency/with_lock.h"
namespace mongo {
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index f2fe8df56a0..b3e1d73c17b 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -28,22 +28,34 @@
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
+#define LOG_FOR_TRANSACTION(level) \
+ MONGO_LOG_COMPONENT(level, ::mongo::logger::LogComponent::kTransaction)
+
#include "mongo/platform/basic.h"
#include "mongo/db/transaction_participant.h"
+#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/concurrency/d_concurrency.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/locker.h"
+#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/curop_failpoint_helpers.h"
+#include "mongo/db/db_raii.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/index/index_access_method.h"
#include "mongo/db/op_observer.h"
+#include "mongo/db/ops/update.h"
+#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/repl_client_info.h"
+#include "mongo/db/retryable_writes_stats.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/session.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/stats/fill_locker_info.h"
+#include "mongo/db/transaction_history_iterator.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/net/socket_utils.h"
@@ -85,7 +97,184 @@ const auto getTransactionParticipant = Session::declareDecoration<TransactionPar
const StringMap<int> preparedTxnCmdWhitelist = {
{"abortTransaction", 1}, {"commitTransaction", 1}, {"prepareTransaction", 1}};
-} // unnamed namespace
+void fassertOnRepeatedExecution(const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ StmtId stmtId,
+ const repl::OpTime& firstOpTime,
+ const repl::OpTime& secondOpTime) {
+ severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":"
+ << txnNumber << " ] was committed once with opTime " << firstOpTime
+ << " and a second time with opTime " << secondOpTime
+ << ". This indicates possible data corruption or server bug and the process will be "
+ "terminated.";
+ fassertFailed(40526);
+}
+
+struct ActiveTransactionHistory {
+ boost::optional<SessionTxnRecord> lastTxnRecord;
+ TransactionParticipant::CommittedStatementTimestampMap committedStatements;
+ bool transactionCommitted{false};
+ bool hasIncompleteHistory{false};
+};
+
+ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
+ const LogicalSessionId& lsid) {
+ // Since we are using DBDirectClient to read the transactions table and the oplog, we should
+ // never be reading from a snapshot, but directly from what is the latest on disk. This
+ // invariant guards against programming errors where the default read concern on the
+ // OperationContext could have been changed to something other than 'local'.
+ invariant(repl::ReadConcernArgs::get(opCtx).getLevel() ==
+ repl::ReadConcernLevel::kLocalReadConcern);
+
+ ActiveTransactionHistory result;
+
+ result.lastTxnRecord = [&]() -> boost::optional<SessionTxnRecord> {
+ DBDirectClient client(opCtx);
+ auto result =
+ client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())});
+ if (result.isEmpty()) {
+ return boost::none;
+ }
+
+ return SessionTxnRecord::parse(IDLParserErrorContext("parse latest txn record for session"),
+ result);
+ }();
+
+ if (!result.lastTxnRecord) {
+ return result;
+ }
+
+ auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime());
+ while (it.hasNext()) {
+ try {
+ const auto entry = it.next(opCtx);
+ invariant(entry.getStatementId());
+
+ if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
+ // Only the dead end sentinel can have this id for oplog write history
+ invariant(entry.getObject2());
+ invariant(entry.getObject2()->woCompare(TransactionParticipant::kDeadEndSentinel) ==
+ 0);
+ result.hasIncompleteHistory = true;
+ continue;
+ }
+
+ const auto insertRes =
+ result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
+ if (!insertRes.second) {
+ const auto& existingOpTime = insertRes.first->second;
+ fassertOnRepeatedExecution(lsid,
+ result.lastTxnRecord->getTxnNum(),
+ *entry.getStatementId(),
+ existingOpTime,
+ entry.getOpTime());
+ }
+
+ // applyOps oplog entry marks the commit of a transaction.
+ if (entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) {
+ result.transactionCommitted = true;
+ }
+ } catch (const DBException& ex) {
+ if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
+ result.hasIncompleteHistory = true;
+ break;
+ }
+
+ throw;
+ }
+ }
+
+ return result;
+}
+
+void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) {
+ // Current code only supports replacement update.
+ dassert(UpdateDriver::isDocReplacement(updateRequest.getUpdates()));
+
+ AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX);
+
+ uassert(40527,
+ str::stream() << "Unable to persist transaction state because the session transaction "
+ "collection is missing. This indicates that the "
+ << NamespaceString::kSessionTransactionsTableNamespace.ns()
+ << " collection has been manually deleted.",
+ autoColl.getCollection());
+
+ WriteUnitOfWork wuow(opCtx);
+
+ auto collection = autoColl.getCollection();
+ auto idIndex = collection->getIndexCatalog()->findIdIndex(opCtx);
+
+ uassert(40672,
+ str::stream() << "Failed to fetch _id index for "
+ << NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ idIndex);
+
+ auto indexAccess = collection->getIndexCatalog()->getIndex(idIndex);
+ // Since we are looking up a key inside the _id index, create a key object consisting of only
+ // the _id field.
+ auto idToFetch = updateRequest.getQuery().firstElement();
+ auto toUpdateIdDoc = idToFetch.wrap();
+ dassert(idToFetch.fieldNameStringData() == "_id"_sd);
+ auto recordId = indexAccess->findSingle(opCtx, toUpdateIdDoc);
+ auto startingSnapshotId = opCtx->recoveryUnit()->getSnapshotId();
+
+ if (recordId.isNull()) {
+ // Upsert case.
+ auto status = collection->insertDocument(
+ opCtx, InsertStatement(updateRequest.getUpdates()), nullptr, false);
+
+ if (status == ErrorCodes::DuplicateKey) {
+ throw WriteConflictException();
+ }
+
+ uassertStatusOK(status);
+ wuow.commit();
+ return;
+ }
+
+ auto originalRecordData = collection->getRecordStore()->dataFor(opCtx, recordId);
+ auto originalDoc = originalRecordData.toBson();
+
+ invariant(collection->getDefaultCollator() == nullptr);
+ boost::intrusive_ptr<ExpressionContext> expCtx(new ExpressionContext(opCtx, nullptr));
+
+ auto matcher =
+ fassert(40673, MatchExpressionParser::parse(updateRequest.getQuery(), std::move(expCtx)));
+ if (!matcher->matchesBSON(originalDoc)) {
+ // Document no longer match what we expect so throw WCE to make the caller re-examine.
+ throw WriteConflictException();
+ }
+
+ CollectionUpdateArgs args;
+ args.update = updateRequest.getUpdates();
+ args.criteria = toUpdateIdDoc;
+ args.fromMigrate = false;
+
+ collection->updateDocument(opCtx,
+ recordId,
+ Snapshotted<BSONObj>(startingSnapshotId, originalDoc),
+ updateRequest.getUpdates(),
+ false, // indexesAffected = false because _id is the only index
+ nullptr,
+ &args);
+
+ wuow.commit();
+}
+
+// Failpoint which allows different failure actions to happen after each write. Supports the
+// parameters below, which can be combined with each other (unless explicitly disallowed):
+//
+// closeConnection (bool, default = true): Closes the connection on which the write was executed.
+// failBeforeCommitExceptionCode (int, default = not specified): If set, the specified exception
+// code will be thrown, which will cause the write to not commit; if not specified, the write
+// will be allowed to commit.
+MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite);
+
+} // namespace
+
+const BSONObj TransactionParticipant::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
TransactionParticipant* TransactionParticipant::get(OperationContext* opCtx) {
auto session = OperationContextSession::get(opCtx);
@@ -100,12 +289,9 @@ TransactionParticipant* TransactionParticipant::getFromNonCheckedOutSession(Sess
return &getTransactionParticipant(session);
}
-const Session* TransactionParticipant::_getSession() const {
- return getTransactionParticipant.owner(this);
-}
-
-Session* TransactionParticipant::_getSession() {
- return getTransactionParticipant.owner(this);
+const LogicalSessionId& TransactionParticipant::_sessionId() const {
+ const auto* owningSession = getTransactionParticipant.owner(this);
+ return owningSession->getSessionId();
}
void TransactionParticipant::_beginOrContinueRetryableWrite(WithLock wl, TxnNumber txnNumber) {
@@ -132,11 +318,11 @@ void TransactionParticipant::_continueMultiDocumentTransaction(WithLock wl, TxnN
txnNumber == _activeTxnNumber && !_txnState.isNone(wl));
if (_txnState.isInProgress(wl) && !_txnResourceStash) {
- // This indicates that the first command in the transaction failed but did not
- // implicitly abort the transaction. It is not safe to continue the transaction, in
- // particular because we have not saved the readConcern from the first statement of
- // the transaction.
+ // This indicates that the first command in the transaction failed but did not implicitly
+ // abort the transaction. It is not safe to continue the transaction, in particular because
+ // we have not saved the readConcern from the first statement of the transaction.
_abortTransactionOnSession(wl);
+
uasserted(ErrorCodes::NoSuchTransaction,
str::stream() << "Transaction " << txnNumber << " has been aborted.");
}
@@ -177,10 +363,15 @@ void TransactionParticipant::beginOrContinue(TxnNumber txnNumber,
boost::optional<bool> autocommit,
boost::optional<bool> startTransaction) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _checkValid(lg);
- if (auto newState = _getSession()->getLastRefreshState()) {
- _updateState(lg, *newState);
- }
+ uassert(ErrorCodes::TransactionTooOld,
+ str::stream() << "Cannot start transaction " << txnNumber << " on session "
+ << _sessionId()
+ << " because a newer transaction "
+ << _activeTxnNumber
+ << " has already started.",
+ txnNumber >= _activeTxnNumber);
// Requests without an autocommit field are interpreted as retryable writes. They cannot specify
// startTransaction, which is verified earlier when parsing the request.
@@ -201,14 +392,14 @@ void TransactionParticipant::beginOrContinue(TxnNumber txnNumber,
}
// Attempt to start a multi-statement transaction, which requires startTransaction be given as
- // an argument on the request. startTransaction can only be specified as true, which is verified
- // earlier when parsing the request.
+ // an argument on the request. The 'startTransaction' argument currently can only be specified
+ // as true, which is verified earlier, when parsing the request.
invariant(*startTransaction);
- // Servers in a sharded cluster can start a new transaction at the active transaction number to
- // allow internal retries by routers on re-targeting errors, like StaleShardVersion or
- // SnapshotTooOld.
if (txnNumber == _activeTxnNumber) {
+ // Servers in a sharded cluster can start a new transaction at the active transaction number
+ // to allow internal retries by routers on re-targeting errors, like
+ // StaleShard/DatabaseVersion or SnapshotTooOld.
uassert(ErrorCodes::ConflictingOperationInProgress,
"Only servers in a sharded cluster can start a new transaction at the active "
"transaction number",
@@ -235,7 +426,11 @@ void TransactionParticipant::beginOrContinue(TxnNumber txnNumber,
void TransactionParticipant::beginOrContinueTransactionUnconditionally(TxnNumber txnNumber) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- // Continuing transaction unconditionally is a no-op since we don't check any on-disk state.
+
+ // We don't check or fetch any on-disk state, so treat the transaction as 'valid' for the
+ // purposes of this method and continue the transaction unconditionally
+ _isValid = true;
+
if (_activeTxnNumber != txnNumber) {
_beginMultiDocumentTransaction(lg, txnNumber);
}
@@ -450,6 +645,7 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
// Always check session's txnNumber and '_txnState', since they can be modified by session
// kill and migration, which do not check out the session.
_checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false);
+
// If this is not a multi-document transaction, there is nothing to unstash.
if (_txnState.isNone(lg)) {
invariant(!_txnResourceStash);
@@ -525,15 +721,11 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx,
boost::optional<repl::OpTime> prepareOptime) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
+
// Always check session's txnNumber and '_txnState', since they can be modified by
// session kill and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
- _getSession()->lockTxnNumber(
- _activeTxnNumber,
- {ErrorCodes::PreparedTransactionInProgress,
- "cannot change transaction number while the session has a prepared transaction"});
-
ScopeGuard abortGuard = MakeGuard([&] {
// Prepare transaction on secondaries should always succeed.
invariant(!prepareOptime);
@@ -548,8 +740,8 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx,
// It is illegal for aborting a prepared transaction to fail for any reason, so we crash
// instead.
severe() << "Caught exception during abort of prepared transaction "
- << opCtx->getTxnNumber() << " on " << _getSession()->getSessionId().toBSON()
- << ": " << exceptionToStatus();
+ << opCtx->getTxnNumber() << " on " << _sessionId().toBSON() << ": "
+ << exceptionToStatus();
std::terminate();
}
});
@@ -739,14 +931,12 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx,
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
_finishCommitTransaction(lk, opCtx);
- _getSession()->unlockTxnNumber();
-
} catch (...) {
// It is illegal for committing a prepared transaction to fail for any reason, other than an
// invalid command, so we crash instead.
severe() << "Caught exception during commit of prepared transaction "
- << opCtx->getTxnNumber() << " on " << _getSession()->getSessionId().toBSON()
- << ": " << exceptionToStatus();
+ << opCtx->getTxnNumber() << " on " << _sessionId().toBSON() << ": "
+ << exceptionToStatus();
std::terminate();
}
}
@@ -766,7 +956,7 @@ void TransactionParticipant::_commitStorageTransaction(OperationContext* opCtx)
} catch (...) {
// It is illegal for committing a storage-transaction to fail so we crash instead.
severe() << "Caught exception during commit of storage-transaction " << opCtx->getTxnNumber()
- << " on " << _getSession()->getSessionId().toBSON() << ": " << exceptionToStatus();
+ << " on " << _sessionId().toBSON() << ": " << exceptionToStatus();
std::terminate();
}
@@ -825,7 +1015,7 @@ void TransactionParticipant::abortArbitraryTransactionIfExpired() {
return;
}
- const auto session = _getSession();
+ const auto* session = getTransactionParticipant.owner(this);
auto currentOperation = session->getCurrentOperation();
if (currentOperation) {
// If an operation is still running for this transaction when it expires, kill the currently
@@ -879,8 +1069,8 @@ void TransactionParticipant::abortActiveUnpreparedOrStashPreparedTransaction(
} catch (...) {
// It is illegal for this to throw so we catch and log this here for diagnosability.
severe() << "Caught exception during transaction " << opCtx->getTxnNumber()
- << " abort or stash on " << _getSession()->getSessionId().toBSON() << " in state "
- << _txnState << ": " << exceptionToStatus();
+ << " abort or stash on " << _sessionId().toBSON() << " in state " << _txnState << ": "
+ << exceptionToStatus();
std::terminate();
}
@@ -997,8 +1187,6 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_prepareOpTime = repl::OpTime();
_oldestOplogEntryTS = boost::none;
_speculativeTransactionReadOpTime = repl::OpTime();
-
- _getSession()->unlockTxnNumber();
}
void TransactionParticipant::_cleanUpTxnResourceOnOpCtx(
@@ -1027,28 +1215,18 @@ void TransactionParticipant::_cleanUpTxnResourceOnOpCtx(
void TransactionParticipant::_checkIsActiveTransaction(WithLock wl,
const TxnNumber& requestTxnNumber,
bool checkAbort) const {
- const auto txnNumber = _getSession()->getActiveTxnNumber();
- uassert(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "Cannot perform operations on active transaction " << _activeTxnNumber
- << " on session "
- << _getSession()->getSessionId()
- << " because a different transaction "
- << txnNumber
- << " is now active.",
- txnNumber == _activeTxnNumber);
-
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Cannot perform operations on requested transaction "
<< requestTxnNumber
<< " on session "
- << _getSession()->getSessionId()
+ << _sessionId()
<< " because a different transaction "
<< _activeTxnNumber
<< " is now active.",
requestTxnNumber == _activeTxnNumber);
uassert(ErrorCodes::NoSuchTransaction,
- str::stream() << "Transaction " << txnNumber << " has been aborted.",
+ str::stream() << "Transaction " << _activeTxnNumber << " has been aborted.",
!checkAbort || !_txnState.isAborted(wl));
}
@@ -1098,7 +1276,7 @@ void TransactionParticipant::reportStashedState(BSONObjBuilder* builder) const {
{
BSONObjBuilder lsid(builder->subobjStart("lsid"));
- _getSession()->getSessionId().serialize(&lsid);
+ _sessionId().serialize(&lsid);
}
BSONObjBuilder transactionBuilder;
@@ -1238,21 +1416,6 @@ void TransactionParticipant::_reportTransactionStats(WithLock wl,
builder, readConcernArgs, tickSource, tickSource->getTicks());
}
-void TransactionParticipant::_updateState(WithLock wl, const Session::RefreshState& newState) {
- if (newState.refreshCount <= _lastStateRefreshCount) {
- return;
- }
-
- _activeTxnNumber = newState.txnNumber;
- if (newState.isCommitted) {
- _txnState.transitionTo(wl,
- TransactionState::kCommitted,
- TransactionState::TransitionValidation::kRelaxTransitionValidation);
- }
-
- _lastStateRefreshCount = newState.refreshCount;
-}
-
std::string TransactionParticipant::_transactionInfoForLog(
const SingleThreadedLockStats* lockStats,
TransactionState::StateFlag terminationCause,
@@ -1267,7 +1430,7 @@ std::string TransactionParticipant::_transactionInfoForLog(
BSONObjBuilder parametersBuilder;
BSONObjBuilder lsidBuilder(parametersBuilder.subobjStart("lsid"));
- _getSession()->getSessionId().serialize(&lsidBuilder);
+ _sessionId().serialize(&lsidBuilder);
lsidBuilder.doneFast();
parametersBuilder.append("txnNumber", _activeTxnNumber);
@@ -1329,18 +1492,14 @@ void TransactionParticipant::_logSlowTransaction(WithLock wl,
}
}
-void TransactionParticipant::checkForNewTxnNumber() {
- auto txnNumber = _getSession()->getActiveTxnNumber();
-
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- if (txnNumber > _activeTxnNumber) {
- _setNewTxnNumber(lg, txnNumber);
- }
-}
-
void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnNumber) {
- invariant(!_txnState.isInSet(
- wl, TransactionState::kPrepared | TransactionState::kCommittingWithPrepare));
+ uassert(ErrorCodes::PreparedTransactionInProgress,
+ "Cannot change transaction number while the session has a prepared transaction",
+ !_txnState.isInSet(
+ wl, TransactionState::kPrepared | TransactionState::kCommittingWithPrepare));
+
+ LOG_FOR_TRANSACTION(4) << "New transaction started with txnNumber: " << txnNumber
+ << " on session with lsid " << _sessionId().getId();
// Abort the existing transaction if it's not prepared, committed, or aborted.
if (_txnState.isInProgress(wl)) {
@@ -1348,16 +1507,357 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN
}
_activeTxnNumber = txnNumber;
+
+ // Reset the retryable writes state
+ _activeTxnCommittedStatements.clear();
+ _hasIncompleteHistory = false;
+
+ // Reset the transactional state
_txnState.transitionTo(wl, TransactionState::kNone);
- {
- stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
- _transactionMetricsObserver.resetSingleTransactionStats(txnNumber);
- }
_prepareOpTime = repl::OpTime();
_oldestOplogEntryTS = boost::none;
_speculativeTransactionReadOpTime = repl::OpTime();
_multikeyPathInfo.clear();
_autoCommit = boost::none;
+
+ // Reset the transactions metrics
+ stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
+ _transactionMetricsObserver.resetSingleTransactionStats(txnNumber);
+}
+
+void TransactionParticipant::refreshFromStorageIfNeeded(OperationContext* opCtx) {
+ if (opCtx->getClient()->isInDirectClient()) {
+ return;
+ }
+
+ invariant(!opCtx->lockState()->isLocked());
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+
+ while (!_isValid) {
+ const int numInvalidations = _numInvalidations;
+
+ ul.unlock();
+
+ auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId());
+
+ ul.lock();
+
+ // Protect against concurrent refreshes or invalidations
+ if (!_isValid && _numInvalidations == numInvalidations) {
+ _isValid = true;
+ _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord);
+
+ if (_lastWrittenSessionRecord) {
+ _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum();
+ _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements);
+ _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory;
+
+ if (activeTxnHistory.transactionCommitted) {
+ _txnState.transitionTo(
+ ul,
+ TransactionState::kCommitted,
+ TransactionState::TransitionValidation::kRelaxTransitionValidation);
+ }
+ }
+
+ break;
+ }
+ }
+}
+
+void TransactionParticipant::onWriteOpCompletedOnPrimary(
+ OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate,
+ boost::optional<DurableTxnStateEnum> txnState) {
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+
+ // Sanity check that we don't double-execute statements
+ for (const auto stmtId : stmtIdsWritten) {
+ const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId);
+ if (stmtOpTime) {
+ fassertOnRepeatedExecution(
+ _sessionId(), txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
+ }
+ }
+
+ const auto updateRequest =
+ _makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate, txnState);
+
+ ul.unlock();
+
+ repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
+
+ updateSessionEntry(opCtx, updateRequest);
+ _registerUpdateCacheOnCommit(
+ opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
+}
+
+bool TransactionParticipant::onMigrateBeginOnPrimary(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ StmtId stmtId) {
+ beginOrContinue(txnNumber, boost::none, boost::none);
+
+ try {
+ if (checkStatementExecuted(opCtx, txnNumber, stmtId)) {
+ return false;
+ }
+ } catch (const DBException& ex) {
+ // If the transaction chain was truncated on the recipient shard, then we
+ // are most likely copying from a session that hasn't been touched on the
+ // recipient shard for a very long time but could be recent on the donor.
+ // We continue copying regardless to get the entire transaction from the donor.
+ if (ex.code() != ErrorCodes::IncompleteTransactionHistory) {
+ throw;
+ }
+ if (stmtId == kIncompleteHistoryStmtId) {
+ return false;
+ }
+ }
+
+ return true;
+}
+
+void TransactionParticipant::onMigrateCompletedOnPrimary(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t oplogLastStmtIdWriteDate) {
+ invariant(opCtx->lockState()->inAWriteUnitOfWork());
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+
+ _checkValid(ul);
+ _checkIsActiveTransaction(ul, txnNumber);
+
+ // We do not migrate transaction oplog entries.
+ auto txnState = boost::none;
+ const auto updateRequest = _makeUpdateRequest(
+ ul, txnNumber, lastStmtIdWriteOpTime, oplogLastStmtIdWriteDate, txnState);
+
+ ul.unlock();
+
+ repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
+
+ updateSessionEntry(opCtx, updateRequest);
+ _registerUpdateCacheOnCommit(
+ opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
+}
+
+void TransactionParticipant::invalidate() {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
+ uassert(ErrorCodes::PreparedTransactionInProgress,
+ "Cannot invalidate prepared transaction",
+ !_txnState.isInSet(
+ lg, TransactionState::kPrepared | TransactionState::kCommittingWithPrepare));
+
+ _isValid = false;
+ _numInvalidations++;
+
+ _lastWrittenSessionRecord.reset();
+
+ _activeTxnNumber = kUninitializedTxnNumber;
+ _activeTxnCommittedStatements.clear();
+ _hasIncompleteHistory = false;
+}
+
+repl::OpTime TransactionParticipant::getLastWriteOpTime(TxnNumber txnNumber) const {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _checkValid(lg);
+ _checkIsActiveTransaction(lg, txnNumber);
+
+ if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
+ return {};
+
+ return _lastWrittenSessionRecord->getLastWriteOpTime();
+}
+
+boost::optional<repl::OplogEntry> TransactionParticipant::checkStatementExecuted(
+ OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) const {
+ const auto stmtTimestamp = [&] {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ return _checkStatementExecuted(lg, txnNumber, stmtId);
+ }();
+
+ if (!stmtTimestamp)
+ return boost::none;
+
+ TransactionHistoryIterator txnIter(*stmtTimestamp);
+ while (txnIter.hasNext()) {
+ const auto entry = txnIter.next(opCtx);
+ invariant(entry.getStatementId());
+ if (*entry.getStatementId() == stmtId)
+ return entry;
+ }
+
+ MONGO_UNREACHABLE;
+}
+
+bool TransactionParticipant::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber,
+ StmtId stmtId) const {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ return bool(_checkStatementExecuted(lg, txnNumber, stmtId));
+}
+
+void TransactionParticipant::_checkValid(WithLock) const {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Session " << _sessionId()
+ << " was concurrently modified and the operation must be retried.",
+ _isValid);
+}
+
+void TransactionParticipant::_checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const {
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ str::stream() << "Cannot perform operations on transaction " << txnNumber
+ << " on session "
+ << _sessionId()
+ << " because a different transaction "
+ << _activeTxnNumber
+ << " is now active.",
+ txnNumber == _activeTxnNumber);
+}
+
+boost::optional<repl::OpTime> TransactionParticipant::_checkStatementExecuted(WithLock wl,
+ TxnNumber txnNumber,
+ StmtId stmtId) const {
+ _checkValid(wl);
+ _checkIsActiveTransaction(wl, txnNumber);
+
+ const auto it = _activeTxnCommittedStatements.find(stmtId);
+ if (it == _activeTxnCommittedStatements.end()) {
+ uassert(ErrorCodes::IncompleteTransactionHistory,
+ str::stream() << "Incomplete history detected for transaction " << txnNumber
+ << " on session "
+ << _sessionId(),
+ !_hasIncompleteHistory);
+
+ return boost::none;
+ }
+
+ invariant(_lastWrittenSessionRecord);
+ invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber);
+
+ return it->second;
+}
+
+Date_t TransactionParticipant::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const {
+ _checkValid(wl);
+ _checkIsActiveTransaction(wl, txnNumber);
+
+ if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
+ return {};
+
+ return _lastWrittenSessionRecord->getLastWriteDate();
+}
+
+UpdateRequest TransactionParticipant::_makeUpdateRequest(
+ WithLock,
+ TxnNumber newTxnNumber,
+ const repl::OpTime& newLastWriteOpTime,
+ Date_t newLastWriteDate,
+ boost::optional<DurableTxnStateEnum> newState) const {
+ UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
+
+ const auto updateBSON = [&] {
+ SessionTxnRecord newTxnRecord;
+ newTxnRecord.setSessionId(_sessionId());
+ newTxnRecord.setTxnNum(newTxnNumber);
+ newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
+ newTxnRecord.setLastWriteDate(newLastWriteDate);
+ newTxnRecord.setState(newState);
+ return newTxnRecord.toBSON();
+ }();
+ updateRequest.setUpdates(updateBSON);
+ updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId().toBSON()));
+ updateRequest.setUpsert(true);
+
+ return updateRequest;
+}
+
+void TransactionParticipant::_registerUpdateCacheOnCommit(
+ OperationContext* opCtx,
+ TxnNumber newTxnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteOpTime) {
+ opCtx->recoveryUnit()->onCommit(
+ [ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ](
+ boost::optional<Timestamp>) {
+ RetryableWritesStats::get(getGlobalServiceContext())
+ ->incrementTransactionsCollectionWriteCount();
+
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
+ if (!_isValid)
+ return;
+
+ // The cache of the last written record must always be advanced after a write so that
+ // subsequent writes have the correct point to start from.
+ if (!_lastWrittenSessionRecord) {
+ _lastWrittenSessionRecord.emplace();
+
+ _lastWrittenSessionRecord->setSessionId(_sessionId());
+ _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
+ _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
+ } else {
+ if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
+ _lastWrittenSessionRecord->setTxnNum(newTxnNumber);
+
+ if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
+ _lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
+ }
+
+ if (newTxnNumber > _activeTxnNumber) {
+ // This call is necessary in order to advance the txn number and reset the cached
+ // state in the case where just before the storage transaction commits, the cache
+ // entry gets invalidated and immediately refreshed while there were no writes for
+ // newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber
+ // and we will fail to update the cache even though the write was successful.
+ _beginOrContinueRetryableWrite(lg, newTxnNumber);
+ }
+
+ if (newTxnNumber == _activeTxnNumber) {
+ for (const auto stmtId : stmtIdsWritten) {
+ if (stmtId == kIncompleteHistoryStmtId) {
+ _hasIncompleteHistory = true;
+ continue;
+ }
+
+ const auto insertRes =
+ _activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
+ if (!insertRes.second) {
+ const auto& existingOpTime = insertRes.first->second;
+ fassertOnRepeatedExecution(_sessionId(),
+ newTxnNumber,
+ stmtId,
+ existingOpTime,
+ lastStmtIdWriteOpTime);
+ }
+ }
+ }
+ });
+
+ MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) {
+ const auto& data = customArgs.getData();
+
+ const auto closeConnectionElem = data["closeConnection"];
+ if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) {
+ opCtx->getClient()->session()->end();
+ }
+
+ const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"];
+ if (!failBeforeCommitExceptionElem.eoo()) {
+ const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number()));
+ uasserted(failureCode,
+ str::stream() << "Failing write for " << _sessionId() << ":" << newTxnNumber
+ << " due to failpoint. The write must not be reflected.");
+ }
+ }
}
} // namespace mongo
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 9bdefa70dec..29e9b3e8bfe 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -36,15 +36,18 @@
#include "mongo/db/concurrency/locker.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/multi_key_path_tracker.h"
+#include "mongo/db/ops/update_request.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/session.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/single_transaction_stats.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/transaction_metrics_observer.h"
+#include "mongo/stdx/unordered_map.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/with_lock.h"
-#include "mongo/util/decorable.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
@@ -139,6 +142,10 @@ public:
OperationContext* _opCtx;
};
+ using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>;
+
+ static const BSONObj kDeadEndSentinel;
+
TransactionParticipant() = default;
/**
@@ -353,22 +360,44 @@ public:
}
/**
- * Starts a new transaction, or continues an already active transaction.
+ * Starts a new transaction (and if the txnNumber is newer aborts any in-progress transaction on
+ * the session), or continues an already active transaction.
+ *
+ * 'autocommit' comes from the 'autocommit' field in the original client request. The only valid
+ * values are boost::none (meaning no autocommit was specified) and false (meaning that this is
+ * the beginning of a multi-statement transaction).
+ *
+ * 'startTransaction' comes from the 'startTransaction' field in the original client request.
+ * See below for the acceptable values and the meaning of the combinations of autocommit and
+ * startTransaction.
*
- * The 'autocommit' argument represents the value of the field given in the original client
- * request. If it is boost::none, no autocommit parameter was passed into the request. Every
- * operation that is part of a multi statement transaction must specify 'autocommit=false'.
- * 'startTransaction' represents the value of the field given in the original client request,
- * and indicates whether this operation is the beginning of a multi-statement transaction.
+ * autocommit = boost::none, startTransaction = boost::none: Means retryable write
+ * autocommit = false, startTransaction = boost::none: Means continuation of a multi-statement
+ * transaction
+ * autocommit = false, startTransaction = true: Means abort whatever transaction is in progress
+ * on the session and start a new transaction
*
- * Throws an exception if:
- * - The values of 'autocommit' and/or 'startTransaction' are inconsistent with the current
- * state of the transaction.
+ * Any combination other than the ones listed above will invariant since it is expected that the
+ * caller has performed the necessary customer input validations.
+ *
+ * Exceptions of note, which can be thrown are:
+ * - TransactionTooOld - if attempt is made to start a transaction older than the currently
+ * active one or the last one which committed
+ * - PreparedTransactionInProgress - if the transaction is in the prepared state and a new
+ * transaction or retryable write is attempted
*/
void beginOrContinue(TxnNumber txnNumber,
boost::optional<bool> autocommit,
boost::optional<bool> startTransaction);
+ /**
+ * Used only by the secondary oplog application logic. Equivalent to 'beginOrContinue(txnNumber,
+ * false, true)' without performing any checks for whether the new txnNumber will start a
+ * transaction number in the past.
+ *
+ * NOTE: This method assumes that there are no concurrent users of the transaction since it
+ * unconditionally changes the active transaction on the session.
+ */
void beginOrContinueTransactionUnconditionally(TxnNumber txnNumber);
void transitionToPreparedforTest() {
@@ -382,10 +411,101 @@ public:
}
/**
- * Checks to see if the txnNumber changed in the parent session and perform the necessary
- * cleanup.
+ * Blocking method, which loads the transaction state from storage if it has been marked as
+ * needing refresh.
+ *
+ * In order to avoid the possibility of deadlock, this method must not be called while holding a
+ * lock.
+ */
+ void refreshFromStorageIfNeeded(OperationContext* opCtx);
+
+ TxnNumber getActiveTxnNumber() const {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ return _activeTxnNumber;
+ }
+
+ /**
+ * Called after a write under the specified transaction completes while the node is a primary
+ * and specifies the statement ids which were written. Must be called while the caller is still
+ * in the write's WUOW. Updates the on-disk state of the session to match the specified
+ * transaction/opTime and keeps the cached state in sync.
+ *
+ * 'txnState' is 'none' for retryable writes.
+ *
+ * Must only be called with the session checked-out.
+ *
+ * Throws if the session has been invalidated or the active transaction number doesn't match.
+ */
+ void onWriteOpCompletedOnPrimary(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate,
+ boost::optional<DurableTxnStateEnum> txnState);
+
+ /**
+ * Helper function to begin a migration on a primary node.
+ *
+ * Returns whether the specified statement should be migrated at all or skipped.
+ *
+ * Not called with session checked out.
+ */
+ bool onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId);
+
+ /**
+ * Called after an entry for the specified session and transaction has been written to the oplog
+ * during chunk migration, while the node is still primary. Must be called while the caller is
+ * still in the oplog write's WUOW. Updates the on-disk state of the session to match the
+ * specified transaction/opTime and keeps the cached state in sync.
+ *
+ * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary
+ * and doesn't require the session to be checked-out.
+ *
+ * Throws if the session has been invalidated or the active transaction number is newer than the
+ * one specified.
+ */
+ void onMigrateCompletedOnPrimary(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t oplogLastStmtIdWriteDate);
+
+ /**
+ * Marks the session as requiring refresh. Used when the session state has been modified
+ * externally, such as through a direct write to the transactions table.
+ */
+ void invalidate();
+
+ /**
+ * Returns the op time of the last committed write for this session and transaction. If no write
+ * has completed yet, returns an empty timestamp.
+ *
+ * Throws if the session has been invalidated or the active transaction number doesn't match.
+ */
+ repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const;
+
+ /**
+ * Checks whether the given statementId for the specified transaction has already executed and
+ * if so, returns the oplog entry which was generated by that write. If the statementId hasn't
+ * executed, returns boost::none.
+ *
+ * Must only be called with the session checked-out.
+ *
+ * Throws if the session has been invalidated or the active transaction number doesn't match.
+ */
+ boost::optional<repl::OplogEntry> checkStatementExecuted(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ StmtId stmtId) const;
+
+ /**
+ * Checks whether the given statementId for the specified transaction has already executed
+ * without fetching the oplog entry which was generated by that write.
+ *
+ * Must only be called with the session checked-out.
+ *
+ * Throws if the session has been invalidated or the active transaction number doesn't match.
*/
- void checkForNewTxnNumber();
+ bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const;
private:
/**
@@ -503,6 +623,43 @@ private:
return (s << txnState.toString());
}
+ // Shortcut to obtain the id of the session under which this participant runs
+ const LogicalSessionId& _sessionId() const;
+
+ /**
+ * Performing any checks based on the in-memory state of the TransactionParticipant requires
+ * that the object is fully in sync with its on-disk representation in the transactions table.
+ * This method checks that. The object can be out of sync with the on-disk representation either
+ * when it was just created, or after invalidate() was called (which typically happens after a
+ * direct write to the transactions table).
+ */
+ void _checkValid(WithLock) const;
+
+ // Checks that the specified transaction number is the same as the activeTxnNumber. Effectively
+ // a check that the caller operates on the transaction it thinks it is operating on.
+ void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const;
+
+ boost::optional<repl::OpTime> _checkStatementExecuted(WithLock,
+ TxnNumber txnNumber,
+ StmtId stmtId) const;
+
+ // Returns the write date of the last committed write for this session and transaction. If no
+ // write has completed yet, returns an empty date.
+ //
+ // Throws if the session has been invalidated or the active transaction number doesn't match.
+ Date_t _getLastWriteDate(WithLock, TxnNumber txnNumber) const;
+
+ UpdateRequest _makeUpdateRequest(WithLock,
+ TxnNumber newTxnNumber,
+ const repl::OpTime& newLastWriteOpTime,
+ Date_t newLastWriteDate,
+ boost::optional<DurableTxnStateEnum> newState) const;
+
+ void _registerUpdateCacheOnCommit(OperationContext* opCtx,
+ TxnNumber newTxnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteTs);
+
// Finishes committing the multi-document transaction after the storage-transaction has been
// committed, the oplog entry has been inserted into the oplog, and the transactions table has
// been updated.
@@ -562,8 +719,6 @@ private:
BSONObjBuilder* builder,
repl::ReadConcernArgs readConcernArgs) const;
- void _updateState(WithLock wl, const Session::RefreshState& newState);
-
// Bumps up the transaction number of this transaction and perform the necessary cleanup.
void _setNewTxnNumber(WithLock wl, const TxnNumber& txnNumber);
@@ -577,10 +732,6 @@ private:
// number.
void _continueMultiDocumentTransaction(WithLock wl, TxnNumber txnNumber);
- // Returns the session that this transaction belongs to.
- const Session* _getSession() const;
- Session* _getSession();
-
// Protects the member variables below.
mutable stdx::mutex _mutex;
@@ -599,10 +750,9 @@ private:
// Total size in bytes of all operations within the _transactionOperations vector.
size_t _transactionOperationBytes = 0;
- // This is the txnNumber that this transaction is actively working on. It can be different from
- // the current txnNumber of the parent session (since it can be changed in couple of ways, like
- // migration). In which case, it should make the necessary steps to also bump this number, like
- // aborting the current transaction.
+ // Tracks the last seen txn number for the session and is always >= to the transaction number in
+ // the last written txn record. When it is > than that in the last written txn record, this
+ // means a new transaction has begun on the session, but it hasn't yet performed any writes.
TxnNumber _activeTxnNumber{kUninitializedTxnNumber};
// Set when a snapshot read / transaction begins. Alleviates cache pressure by limiting how long
@@ -625,8 +775,28 @@ private:
std::vector<MultikeyPathInfo> _multikeyPathInfo;
- // Remembers the refresh count this object has read from Session.
- long long _lastStateRefreshCount{0};
+ //
+ // Retryable writes state
+ //
+
+ // Specifies whether the session information needs to be refreshed from storage
+ bool _isValid{false};
+
+ // Counter, incremented with each call to invalidate in order to discern invalidations, which
+ // happen during refresh
+ int _numInvalidations{0};
+
+ // Set to true if incomplete history is detected. For example, when the oplog to a write was
+ // truncated because it was too old.
+ bool _hasIncompleteHistory{false};
+
+ // Caches what is known to be the last written transaction record for the session
+ boost::optional<SessionTxnRecord> _lastWrittenSessionRecord;
+
+ // For the active txn, tracks which statement ids have been committed and at which oplog
+ // opTime. Used for fast retryability check and retrieving the previous write's data without
+ // having to scan through the oplog.
+ CommittedStatementTimestampMap _activeTxnCommittedStatements;
// Protects _transactionMetricsObserver. The concurrency rules are that const methods on
// _transactionMetricsObserver may be called under either _mutex or _metricsMutex, but for
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index 193b219db61..0836660bba1 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -43,6 +43,7 @@
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog.h"
#include "mongo/db/stats/fill_locker_info.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/death_test.h"
@@ -85,14 +86,34 @@ repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
class OpObserverMock : public OpObserverNoop {
public:
- void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override;
+ void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override {
+ ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
+ OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime);
+
+ uassert(ErrorCodes::OperationFailed,
+ "onTransactionPrepare() failed",
+ !onTransactionPrepareThrowsException);
+
+ onTransactionPrepareFn();
+ }
+
bool onTransactionPrepareThrowsException = false;
bool transactionPrepared = false;
stdx::function<void()> onTransactionPrepareFn = [this]() { transactionPrepared = true; };
void onTransactionCommit(OperationContext* opCtx,
boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) override;
+ boost::optional<Timestamp> commitTimestamp) override {
+ ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
+ OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp);
+
+ uassert(ErrorCodes::OperationFailed,
+ "onTransactionCommit() failed",
+ !onTransactionCommitThrowsException);
+
+ onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp);
+ }
+
bool onTransactionCommitThrowsException = false;
bool transactionCommitted = false;
stdx::function<void(boost::optional<OplogSlot>, boost::optional<Timestamp>)>
@@ -101,31 +122,7 @@ public:
boost::optional<Timestamp> commitTimestamp) { transactionCommitted = true; };
};
-void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) {
- ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime);
-
- uassert(ErrorCodes::OperationFailed,
- "onTransactionPrepare() failed",
- !onTransactionPrepareThrowsException);
-
- onTransactionPrepareFn();
-}
-
-void OpObserverMock::onTransactionCommit(OperationContext* opCtx,
- boost::optional<OplogSlot> commitOplogEntryOpTime,
- boost::optional<Timestamp> commitTimestamp) {
- ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
- OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp);
-
- uassert(ErrorCodes::OperationFailed,
- "onTransactionCommit() failed",
- !onTransactionCommitThrowsException);
-
- onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp);
-}
-
-class SessionTest : public MockReplCoordServerFixture {
+class TransactionParticipantRetryableWritesTest : public MockReplCoordServerFixture {
protected:
void setUp() final {
MockReplCoordServerFixture::setUp();
@@ -194,13 +191,15 @@ protected:
repl::OpTime prevOpTime,
boost::optional<DurableTxnStateEnum> txnState) {
const auto uuid = UUID::gen();
- session->beginOrContinueTxn(opCtx(), txnNum);
+
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime =
logOp(opCtx(), kNss, uuid, session->getSessionId(), txnNum, stmtId, prevOpTime);
- session->onWriteOpCompletedOnPrimary(
+ txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState);
wuow.commit();
@@ -228,26 +227,29 @@ protected:
ASSERT(txnRecord.getState() == txnState);
ASSERT_EQ(txnState != boost::none,
txnRecordObj.hasField(SessionTxnRecord::kStateFieldName));
- ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum));
- session->invalidate();
- session->refreshFromStorageIfNeeded(opCtx());
- ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum));
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session);
+ ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
+
+ txnParticipant->invalidate();
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
}
OpObserverMock* _opObserver = nullptr;
};
-TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) {
+TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 20;
- session.beginOrContinueTxn(opCtx(), txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
ASSERT_EQ(sessionId, session.getSessionId());
- ASSERT(session.getLastWriteOpTime(txnNum).isNull());
+ ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull());
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
@@ -256,13 +258,14 @@ TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) {
ASSERT(!cursor->more());
}
-TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
+TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrite) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 21;
- session.beginOrContinueTxn(opCtx(), txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none);
@@ -279,13 +282,15 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
ASSERT_EQ(txnNum, txnRecord.getTxnNum());
ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
ASSERT(!txnRecord.getState());
- ASSERT_EQ(opTime, session.getLastWriteOpTime(txnNum));
+ ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
}
-TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
+TEST_F(TransactionParticipantRetryableWritesTest,
+ StartingNewerTransactionUpdatesThePersistedSession) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none);
@@ -303,17 +308,18 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
ASSERT_EQ(200, txnRecord.getTxnNum());
ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime());
ASSERT(!txnRecord.getState());
- ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200));
+ ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200));
- session.invalidate();
- session.refreshFromStorageIfNeeded(opCtx());
- ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200));
+ txnParticipant->invalidate();
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200));
}
-TEST_F(SessionTest, TransactionTableUpdatesReplaceEntireDocument) {
+TEST_F(TransactionParticipantRetryableWritesTest, TransactionTableUpdatesReplaceEntireDocument) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
assertTxnRecord(&session, 100, 0, firstOpTime, boost::none);
@@ -327,25 +333,26 @@ TEST_F(SessionTest, TransactionTableUpdatesReplaceEntireDocument) {
assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none);
}
-TEST_F(SessionTest, StartingOldTxnShouldAssert) {
+TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 20;
- session.beginOrContinueTxn(opCtx(), txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
- ASSERT_THROWS_CODE(session.beginOrContinueTxn(opCtx(), txnNum - 1),
+ ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(txnNum - 1, boost::none, boost::none),
AssertionException,
ErrorCodes::TransactionTooOld);
- ASSERT(session.getLastWriteOpTime(txnNum).isNull());
+ ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull());
}
-TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) {
- const auto uuid = UUID::gen();
+TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionNotDefaultCreated) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
// Drop the transactions table
BSONObj dropResult;
@@ -354,84 +361,93 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) {
ASSERT(client.runCommand(nss.db().toString(), BSON("drop" << nss.coll()), dropResult));
const TxnNumber txnNum = 21;
- session.beginOrContinueTxn(opCtx(), txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
+
+ const auto uuid = UUID::gen();
const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
- ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(
+ ASSERT_THROWS(txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none),
AssertionException);
}
-TEST_F(SessionTest, CheckStatementExecuted) {
+TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 100;
- session.beginOrContinueTxn(opCtx(), txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
- ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000));
- ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
+ ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000));
+ ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none);
- ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000));
- ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
+ ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000));
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
- ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000));
- ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
+ ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000));
+ ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none);
- ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000));
- ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
+ ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000));
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
// Invalidate the session and ensure the statements still check out
- session.invalidate();
- session.refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->invalidate();
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
- ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000));
- ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000));
+ ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000));
+ ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000));
- ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
- ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
}
-TEST_F(SessionTest, CheckStatementExecutedForOldTransactionThrows) {
+TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTransactionThrows) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 100;
- session.beginOrContinueTxn(opCtx(), txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
- ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum - 1, 0),
+ ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), txnNum - 1, 0),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
-TEST_F(SessionTest, CheckStatementExecutedForInvalidatedTransactionThrows) {
+TEST_F(TransactionParticipantRetryableWritesTest,
+ CheckStatementExecutedForInvalidatedTransactionThrows) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.invalidate();
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->invalidate();
- ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), 100, 0),
+ ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), 100, 0),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
-TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
- const auto uuid = UUID::gen();
+TEST_F(TransactionParticipantRetryableWritesTest,
+ WriteOpCompletedOnPrimaryForOldTransactionThrows) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 100;
- session.beginOrContinueTxn(opCtx(), txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
+
+ const auto uuid = UUID::gen();
{
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(
+ txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
wuow.commit();
}
@@ -440,60 +456,64 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum - 1, 0);
- ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(
+ ASSERT_THROWS_CODE(txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
}
-TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
- const auto uuid = UUID::gen();
+TEST_F(TransactionParticipantRetryableWritesTest,
+ WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 100;
- session.beginOrContinueTxn(opCtx(), txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
+ const auto uuid = UUID::gen();
const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
- session.invalidate();
+ txnParticipant->invalidate();
- ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(
+ ASSERT_THROWS_CODE(txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
-TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
- const auto uuid = UUID::gen();
+TEST_F(TransactionParticipantRetryableWritesTest,
+ WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
const TxnNumber txnNum = 100;
- session.beginOrContinueTxn(opCtx(), txnNum);
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
{
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
+ const auto uuid = UUID::gen();
const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(
+ txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
- session.invalidate();
+ txnParticipant->invalidate();
wuow.commit();
}
- session.refreshFromStorageIfNeeded(opCtx());
- ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 0));
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0));
}
-TEST_F(SessionTest, IncompleteHistoryDueToOpLogTruncation) {
+TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) {
const auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 2;
@@ -546,22 +566,23 @@ TEST_F(SessionTest, IncompleteHistoryDueToOpLogTruncation) {
}
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
- ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum, 0),
+ ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0),
AssertionException,
ErrorCodes::IncompleteTransactionHistory);
- ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1));
- ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2));
+ ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1));
+ ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2));
- ASSERT_THROWS_CODE(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 0),
+ ASSERT_THROWS_CODE(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 0),
AssertionException,
ErrorCodes::IncompleteTransactionHistory);
- ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1));
- ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2));
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1));
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2));
}
-TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
+TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
const auto uuid = UUID::gen();
const auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 2;
@@ -571,8 +592,9 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
osi.setTxnNumber(txnNum);
Session session(sessionId);
- session.refreshFromStorageIfNeeded(opCtx());
- session.beginOrContinueTxn(opCtx(), txnNum);
+ const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(&session);
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
auto firstOpTime = ([&]() {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
@@ -585,7 +607,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
kNss,
uuid,
BSON("x" << 1),
- &Session::kDeadEndSentinel,
+ &TransactionParticipant::kDeadEndSentinel,
false,
wallClockTime,
osi,
@@ -593,7 +615,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
{},
false /* prepare */,
OplogSlot());
- session.onWriteOpCompletedOnPrimary(
+ txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none);
wuow.commit();
@@ -614,7 +636,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
kNss,
uuid,
{},
- &Session::kDeadEndSentinel,
+ &TransactionParticipant::kDeadEndSentinel,
false,
wallClockTime,
osi,
@@ -623,30 +645,30 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
false /* prepare */,
OplogSlot());
- session.onWriteOpCompletedOnPrimary(
+ txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime, boost::none);
wuow.commit();
}
{
- auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1);
+ auto oplog = txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1);
ASSERT_TRUE(oplog);
ASSERT_EQ(firstOpTime, oplog->getOpTime());
}
- ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException);
+ ASSERT_THROWS(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2), AssertionException);
// Should have the same behavior after loading state from storage.
- session.invalidate();
- session.refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->invalidate();
+ txnParticipant->refreshFromStorageIfNeeded(opCtx());
{
- auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1);
+ auto oplog = txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1);
ASSERT_TRUE(oplog);
ASSERT_EQ(firstOpTime, oplog->getOpTime());
}
- ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException);
+ ASSERT_THROWS(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2), AssertionException);
}
} // namespace
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 06ec5f5a00e..20282f83b25 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -227,24 +227,21 @@ protected:
void bumpTxnNumberFromDifferentOpCtx(const LogicalSessionId& sessionId, TxnNumber newTxnNum) {
auto func = [sessionId, newTxnNum](OperationContext* opCtx) {
-
auto session = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId);
auto txnParticipant =
TransactionParticipant::getFromNonCheckedOutSession(session.get());
// Check that there is a transaction in progress with a lower txnNumber.
ASSERT(txnParticipant->inMultiDocumentTransaction());
- ASSERT_LT(session->getActiveTxnNumber(), newTxnNum);
+ ASSERT_LT(txnParticipant->getActiveTxnNumber(), newTxnNum);
// Check that the transaction has some operations, so we can ensure they are cleared.
ASSERT_GT(txnParticipant->transactionOperationsForTest().size(), 0u);
// Bump the active transaction number on the txnParticipant. This should clear all state
// from the previous transaction.
- session->beginOrContinueTxn(opCtx, newTxnNum);
- ASSERT_EQ(session->getActiveTxnNumber(), newTxnNum);
-
- txnParticipant->checkForNewTxnNumber();
+ txnParticipant->beginOrContinue(newTxnNum, boost::none, boost::none);
+ ASSERT_EQ(newTxnNum, txnParticipant->getActiveTxnNumber());
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT_EQ(txnParticipant->transactionOperationsForTest().size(), 0u);
};
@@ -1114,9 +1111,11 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr
auto func = [&](OperationContext* newOpCtx) {
auto session = SessionCatalog::get(newOpCtx)->getOrCreateSession(
newOpCtx, *opCtx()->getLogicalSessionId());
+ auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session.get());
ASSERT_THROWS_CODE(
- session->onMigrateBeginOnPrimary(newOpCtx, *opCtx()->getTxnNumber() + 1, 1),
+ txnParticipant->beginOrContinue(*opCtx()->getTxnNumber() + 1, false, true),
AssertionException,
ErrorCodes::PreparedTransactionInProgress);
};