/**
* Copyright (C) 2017 MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage
#include "mongo/platform/basic.h"
#include "mongo/db/session.h"
#include "mongo/db/catalog/index_catalog.h"
#include "mongo/db/commands/test_commands_enabled.h"
#include "mongo/db/concurrency/lock_state.h"
#include "mongo/db/concurrency/write_conflict_exception.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/index/index_access_method.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/ops/update.h"
#include "mongo/db/query/get_executor.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/retryable_writes_stats.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/transaction_history_iterator.h"
#include "mongo/stdx/memory.h"
#include "mongo/transport/transport_layer.h"
#include "mongo/util/fail_point_service.h"
#include "mongo/util/log.h"
#include "mongo/util/mongoutils/str.h"
#include "mongo/util/net/socket_utils.h"
namespace mongo {
// Server parameter that dictates the max number of milliseconds that any transaction lock request
// will wait for lock acquisition. If an operation provides a greater timeout in a lock request,
// maxTransactionLockRequestTimeoutMillis will override it. If this is set to a negative value, it
// is inactive and nothing will be overridden.
//
// 5 milliseconds will help avoid deadlocks, but will still allow fast-running metadata operations
// to run without aborting transactions.
MONGO_EXPORT_SERVER_PARAMETER(maxTransactionLockRequestTimeoutMillis, int, 5);
// Server parameter that dictates the lifetime given to each transaction.
// Transactions must eventually expire to preempt storage cache pressure immobilizing the system.
MONGO_EXPORT_SERVER_PARAMETER(transactionLifetimeLimitSeconds, std::int32_t, 60)
->withValidator([](const auto& potentialNewValue) {
if (potentialNewValue < 1) {
return Status(ErrorCodes::BadValue,
"transactionLifetimeLimitSeconds must be greater than or equal to 1s");
}
return Status::OK();
});
namespace {
// The command names that are allowed in a multi-document transaction.
const StringMap txnCmdWhitelist = {{"abortTransaction", 1},
{"aggregate", 1},
{"commitTransaction", 1},
{"coordinateCommitTransaction", 1},
{"delete", 1},
{"distinct", 1},
{"doTxn", 1},
{"find", 1},
{"findandmodify", 1},
{"findAndModify", 1},
{"geoSearch", 1},
{"getMore", 1},
{"insert", 1},
{"killCursors", 1},
{"prepareTransaction", 1},
{"update", 1}};
// The command names that are allowed in a multi-document transaction only when test commands are
// enabled.
const StringMap txnCmdForTestingWhitelist = {{"dbHash", 1}};
// The commands that can be run on the 'admin' database in multi-document transactions.
const StringMap txnAdminCommands = {{"abortTransaction", 1},
{"commitTransaction", 1},
{"coordinateCommitTransaction", 1},
{"doTxn", 1},
{"prepareTransaction", 1}};
void fassertOnRepeatedExecution(const LogicalSessionId& lsid,
TxnNumber txnNumber,
StmtId stmtId,
const repl::OpTime& firstOpTime,
const repl::OpTime& secondOpTime) {
severe() << "Statement id " << stmtId << " from transaction [ " << lsid.toBSON() << ":"
<< txnNumber << " ] was committed once with opTime " << firstOpTime
<< " and a second time with opTime " << secondOpTime
<< ". This indicates possible data corruption or server bug and the process will be "
"terminated.";
fassertFailed(40526);
}
struct ActiveTransactionHistory {
boost::optional lastTxnRecord;
Session::CommittedStatementTimestampMap committedStatements;
bool transactionCommitted{false};
bool hasIncompleteHistory{false};
};
ActiveTransactionHistory fetchActiveTransactionHistory(OperationContext* opCtx,
const LogicalSessionId& lsid) {
ActiveTransactionHistory result;
result.lastTxnRecord = [&]() -> boost::optional {
DBDirectClient client(opCtx);
auto result =
client.findOne(NamespaceString::kSessionTransactionsTableNamespace.ns(),
{BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())});
if (result.isEmpty()) {
return boost::none;
}
return SessionTxnRecord::parse(IDLParserErrorContext("parse latest txn record for session"),
result);
}();
if (!result.lastTxnRecord) {
return result;
}
auto it = TransactionHistoryIterator(result.lastTxnRecord->getLastWriteOpTime());
while (it.hasNext()) {
try {
const auto entry = it.next(opCtx);
invariant(entry.getStatementId());
if (*entry.getStatementId() == kIncompleteHistoryStmtId) {
// Only the dead end sentinel can have this id for oplog write history
invariant(entry.getObject2());
invariant(entry.getObject2()->woCompare(Session::kDeadEndSentinel) == 0);
result.hasIncompleteHistory = true;
continue;
}
const auto insertRes =
result.committedStatements.emplace(*entry.getStatementId(), entry.getOpTime());
if (!insertRes.second) {
const auto& existingOpTime = insertRes.first->second;
fassertOnRepeatedExecution(lsid,
result.lastTxnRecord->getTxnNum(),
*entry.getStatementId(),
existingOpTime,
entry.getOpTime());
}
// applyOps oplog entry marks the commit of a transaction.
if (entry.isCommand() &&
entry.getCommandType() == repl::OplogEntry::CommandType::kApplyOps) {
result.transactionCommitted = true;
}
} catch (const DBException& ex) {
if (ex.code() == ErrorCodes::IncompleteTransactionHistory) {
result.hasIncompleteHistory = true;
break;
}
throw;
}
}
return result;
}
void updateSessionEntry(OperationContext* opCtx, const UpdateRequest& updateRequest) {
// Current code only supports replacement update.
dassert(UpdateDriver::isDocReplacement(updateRequest.getUpdates()));
AutoGetCollection autoColl(opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IX);
uassert(40527,
str::stream() << "Unable to persist transaction state because the session transaction "
"collection is missing. This indicates that the "
<< NamespaceString::kSessionTransactionsTableNamespace.ns()
<< " collection has been manually deleted.",
autoColl.getCollection());
WriteUnitOfWork wuow(opCtx);
auto collection = autoColl.getCollection();
auto idIndex = collection->getIndexCatalog()->findIdIndex(opCtx);
uassert(40672,
str::stream() << "Failed to fetch _id index for "
<< NamespaceString::kSessionTransactionsTableNamespace.ns(),
idIndex);
auto indexAccess = collection->getIndexCatalog()->getIndex(idIndex);
// Since we are looking up a key inside the _id index, create a key object consisting of only
// the _id field.
auto idToFetch = updateRequest.getQuery().firstElement();
auto toUpdateIdDoc = idToFetch.wrap();
dassert(idToFetch.fieldNameStringData() == "_id"_sd);
auto recordId = indexAccess->findSingle(opCtx, toUpdateIdDoc);
auto startingSnapshotId = opCtx->recoveryUnit()->getSnapshotId();
if (recordId.isNull()) {
// Upsert case.
auto status = collection->insertDocument(
opCtx, InsertStatement(updateRequest.getUpdates()), nullptr, false);
if (status == ErrorCodes::DuplicateKey) {
throw WriteConflictException();
}
uassertStatusOK(status);
wuow.commit();
return;
}
auto originalRecordData = collection->getRecordStore()->dataFor(opCtx, recordId);
auto originalDoc = originalRecordData.toBson();
invariant(collection->getDefaultCollator() == nullptr);
boost::intrusive_ptr expCtx(new ExpressionContext(opCtx, nullptr));
auto matcher =
fassert(40673, MatchExpressionParser::parse(updateRequest.getQuery(), std::move(expCtx)));
if (!matcher->matchesBSON(originalDoc)) {
// Document no longer match what we expect so throw WCE to make the caller re-examine.
throw WriteConflictException();
}
OplogUpdateEntryArgs args;
args.nss = NamespaceString::kSessionTransactionsTableNamespace;
args.uuid = collection->uuid();
args.update = updateRequest.getUpdates();
args.criteria = toUpdateIdDoc;
args.fromMigrate = false;
collection->updateDocument(opCtx,
recordId,
Snapshotted(startingSnapshotId, originalDoc),
updateRequest.getUpdates(),
false, // indexesAffected = false because _id is the only index
nullptr,
&args);
wuow.commit();
}
// Failpoint which allows different failure actions to happen after each write. Supports the
// parameters below, which can be combined with each other (unless explicitly disallowed):
//
// closeConnection (bool, default = true): Closes the connection on which the write was executed.
// failBeforeCommitExceptionCode (int, default = not specified): If set, the specified exception
// code will be thrown, which will cause the write to not commit; if not specified, the write
// will be allowed to commit.
MONGO_FAIL_POINT_DEFINE(onPrimaryTransactionalWrite);
// Failpoint which will pause an operation just after allocating a point-in-time storage engine
// transaction.
MONGO_FAIL_POINT_DEFINE(hangAfterPreallocateSnapshot);
} // namespace
const BSONObj Session::kDeadEndSentinel(BSON("$incompleteOplogHistory" << 1));
Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
void Session::refreshFromStorageIfNeeded(OperationContext* opCtx) {
if (opCtx->getClient()->isInDirectClient()) {
return;
}
invariant(!opCtx->lockState()->isLocked());
invariant(repl::ReadConcernArgs::get(opCtx).getLevel() ==
repl::ReadConcernLevel::kLocalReadConcern);
stdx::unique_lock ul(_mutex);
while (!_isValid) {
const int numInvalidations = _numInvalidations;
ul.unlock();
auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId);
ul.lock();
// Protect against concurrent refreshes or invalidations
if (!_isValid && _numInvalidations == numInvalidations) {
_isValid = true;
_lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord);
if (_lastWrittenSessionRecord) {
_activeTxnNumber = _lastWrittenSessionRecord->getTxnNum();
_activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements);
_hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory;
if (activeTxnHistory.transactionCommitted) {
// When refreshing the state from storage, we relax transition validation since
// all states are valid next states and we do not want to pollute the state
// transition table for other callers.
_txnState.transitionTo(
ul,
TransitionTable::State::kCommitted,
TransitionTable::TransitionValidation::kRelaxTransitionValidation);
}
}
break;
}
}
}
void Session::beginOrContinueTxn(OperationContext* opCtx,
TxnNumber txnNumber,
boost::optional autocommit,
boost::optional startTransaction,
StringData dbName,
StringData cmdName) {
if (opCtx->getClient()->isInDirectClient()) {
return;
}
invariant(!opCtx->lockState()->isLocked());
uassert(ErrorCodes::OperationNotSupportedInTransaction,
"Cannot run 'count' in a multi-document transaction. Please see "
"http://dochub.mongodb.org/core/transaction-count for a recommended alternative.",
!autocommit || cmdName != "count"_sd);
uassert(ErrorCodes::OperationNotSupportedInTransaction,
str::stream() << "Cannot run '" << cmdName << "' in a multi-document transaction.",
!autocommit || txnCmdWhitelist.find(cmdName) != txnCmdWhitelist.cend() ||
(getTestCommandsEnabled() &&
txnCmdForTestingWhitelist.find(cmdName) != txnCmdForTestingWhitelist.cend()));
uassert(ErrorCodes::OperationNotSupportedInTransaction,
str::stream() << "Cannot run command against the '" << dbName
<< "' database in a transaction",
!autocommit || (dbName != "config"_sd && dbName != "local"_sd &&
(dbName != "admin"_sd ||
txnAdminCommands.find(cmdName) != txnAdminCommands.cend())));
stdx::lock_guard lg(_mutex);
_beginOrContinueTxn(lg, txnNumber, autocommit, startTransaction);
}
void Session::beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber) {
invariant(!opCtx->getClient()->isInDirectClient());
invariant(!opCtx->lockState()->isLocked());
stdx::lock_guard lg(_mutex);
_beginOrContinueTxnOnMigration(lg, txnNumber);
}
void Session::setSpeculativeTransactionOpTimeToLastApplied(OperationContext* opCtx) {
stdx::lock_guard lg(_mutex);
repl::ReplicationCoordinator* replCoord =
repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext());
opCtx->recoveryUnit()->setTimestampReadSource(RecoveryUnit::ReadSource::kLastAppliedSnapshot);
opCtx->recoveryUnit()->preallocateSnapshot();
auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp();
invariant(readTimestamp);
// Transactions do not survive term changes, so combining "getTerm" here with the
// recovery unit timestamp does not cause races.
_speculativeTransactionReadOpTime = {*readTimestamp, replCoord->getTerm()};
}
void Session::onWriteOpCompletedOnPrimary(OperationContext* opCtx,
TxnNumber txnNumber,
std::vector stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime,
Date_t lastStmtIdWriteDate) {
invariant(opCtx->lockState()->inAWriteUnitOfWork());
stdx::unique_lock ul(_mutex);
// Sanity check that we don't double-execute statements
for (const auto stmtId : stmtIdsWritten) {
const auto stmtOpTime = _checkStatementExecuted(ul, txnNumber, stmtId);
if (stmtOpTime) {
fassertOnRepeatedExecution(
_sessionId, txnNumber, stmtId, *stmtOpTime, lastStmtIdWriteOpTime);
}
}
const auto updateRequest =
_makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, lastStmtIdWriteDate);
ul.unlock();
repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
updateSessionEntry(opCtx, updateRequest);
_registerUpdateCacheOnCommit(
opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
}
bool Session::onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) {
beginOrContinueTxnOnMigration(opCtx, txnNumber);
try {
if (checkStatementExecuted(opCtx, txnNumber, stmtId)) {
return false;
}
} catch (const DBException& ex) {
// If the transaction chain was truncated on the recipient shard, then we
// are most likely copying from a session that hasn't been touched on the
// recipient shard for a very long time but could be recent on the donor.
// We continue copying regardless to get the entire transaction from the donor.
if (ex.code() != ErrorCodes::IncompleteTransactionHistory) {
throw;
}
if (stmtId == kIncompleteHistoryStmtId) {
return false;
}
}
return true;
}
void Session::onMigrateCompletedOnPrimary(OperationContext* opCtx,
TxnNumber txnNumber,
std::vector stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime,
Date_t oplogLastStmtIdWriteDate) {
invariant(opCtx->lockState()->inAWriteUnitOfWork());
stdx::unique_lock ul(_mutex);
_checkValid(ul);
_checkIsActiveTransaction(ul, txnNumber, false);
// If the transaction has a populated lastWriteDate, we will use that as the most up-to-date
// value. Using the lastWriteDate from the oplog being migrated may move the lastWriteDate
// back. However, in the case that the transaction doesn't have the lastWriteDate populated,
// the oplog's value serves as a best-case fallback.
const auto txnLastStmtIdWriteDate = _getLastWriteDate(ul, txnNumber);
const auto updatedLastStmtIdWriteDate =
txnLastStmtIdWriteDate == Date_t::min() ? oplogLastStmtIdWriteDate : txnLastStmtIdWriteDate;
const auto updateRequest =
_makeUpdateRequest(ul, txnNumber, lastStmtIdWriteOpTime, updatedLastStmtIdWriteDate);
ul.unlock();
repl::UnreplicatedWritesBlock doNotReplicateWrites(opCtx);
updateSessionEntry(opCtx, updateRequest);
_registerUpdateCacheOnCommit(
opCtx, txnNumber, std::move(stmtIdsWritten), lastStmtIdWriteOpTime);
}
void Session::invalidate() {
stdx::lock_guard lg(_mutex);
_isValid = false;
_numInvalidations++;
_lastWrittenSessionRecord.reset();
_activeTxnNumber = kUninitializedTxnNumber;
_activeTxnCommittedStatements.clear();
_speculativeTransactionReadOpTime = repl::OpTime();
_hasIncompleteHistory = false;
}
repl::OpTime Session::getLastWriteOpTime(TxnNumber txnNumber) const {
stdx::lock_guard lg(_mutex);
_checkValid(lg);
_checkIsActiveTransaction(lg, txnNumber, false);
if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
return {};
return _lastWrittenSessionRecord->getLastWriteOpTime();
}
boost::optional Session::checkStatementExecuted(OperationContext* opCtx,
TxnNumber txnNumber,
StmtId stmtId) const {
const auto stmtTimestamp = [&] {
stdx::lock_guard lg(_mutex);
return _checkStatementExecuted(lg, txnNumber, stmtId);
}();
if (!stmtTimestamp)
return boost::none;
TransactionHistoryIterator txnIter(*stmtTimestamp);
while (txnIter.hasNext()) {
const auto entry = txnIter.next(opCtx);
invariant(entry.getStatementId());
if (*entry.getStatementId() == stmtId)
return entry;
}
MONGO_UNREACHABLE;
}
bool Session::checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const {
stdx::lock_guard lg(_mutex);
return bool(_checkStatementExecuted(lg, txnNumber, stmtId));
}
void Session::_beginOrContinueTxn(WithLock wl,
TxnNumber txnNumber,
boost::optional autocommit,
boost::optional startTransaction) {
// Check whether the session information needs to be refreshed from disk.
_checkValid(wl);
// Check if the given transaction number is valid for this session. The transaction number must
// be >= the active transaction number.
_checkTxnValid(wl, txnNumber);
//
// Continue an active transaction.
//
if (txnNumber == _activeTxnNumber) {
// It is never valid to specify 'startTransaction' on an active transaction.
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Cannot specify 'startTransaction' on transaction " << txnNumber
<< " since it is already in progress.",
startTransaction == boost::none);
// Continue a retryable write.
if (_txnState.isNone(wl)) {
uassert(ErrorCodes::InvalidOptions,
"Cannot specify 'autocommit' on an operation not inside a multi-statement "
"transaction.",
autocommit == boost::none);
return;
}
// Continue a multi-statement transaction. In this case, it is required that
// autocommit=false be given as an argument on the request. Retryable writes will have
// _autocommit=true, so that is why we verify that _autocommit=false here.
if (!_autocommit) {
uassert(
ErrorCodes::InvalidOptions,
"Must specify autocommit=false on all operations of a multi-statement transaction.",
autocommit == boost::optional(false));
if (_txnState.isInProgress(wl) && !_txnResourceStash) {
// This indicates that the first command in the transaction failed but did not
// implicitly abort the transaction. It is not safe to continue the transaction, in
// particular because we have not saved the readConcern from the first statement of
// the transaction.
_abortTransaction(wl);
uasserted(ErrorCodes::NoSuchTransaction,
str::stream() << "Transaction " << txnNumber << " has been aborted.");
}
}
return;
}
//
// Start a new transaction.
//
// At this point, the given transaction number must be > _activeTxnNumber. Existence of an
// 'autocommit' field means we interpret this operation as part of a multi-document transaction.
invariant(txnNumber > _activeTxnNumber);
if (autocommit) {
// Start a multi-document transaction.
invariant(*autocommit == false);
uassert(ErrorCodes::NoSuchTransaction,
str::stream() << "Given transaction number " << txnNumber
<< " does not match any in-progress transactions.",
startTransaction != boost::none);
_setActiveTxn(wl, txnNumber);
_autocommit = false;
_txnState.transitionTo(wl, TransitionTable::State::kInProgress);
// Tracks various transactions metrics.
_singleTransactionStats = SingleTransactionStats();
_singleTransactionStats->setStartTime(curTimeMicros64());
_transactionExpireDate =
Date_t::fromMillisSinceEpoch(_singleTransactionStats->getStartTime() / 1000) +
stdx::chrono::seconds{transactionLifetimeLimitSeconds.load()};
ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementTotalStarted();
ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementCurrentOpen();
} else {
// Execute a retryable write.
invariant(startTransaction == boost::none);
_setActiveTxn(wl, txnNumber);
_autocommit = true;
_txnState.transitionTo(wl, TransitionTable::State::kNone);
// SingleTransactionStats are only for multi-document transactions.
_singleTransactionStats = boost::none;
}
invariant(_transactionOperations.empty());
}
void Session::_checkTxnValid(WithLock, TxnNumber txnNumber) const {
uassert(ErrorCodes::TransactionTooOld,
str::stream() << "Cannot start transaction " << txnNumber << " on session "
<< getSessionId()
<< " because a newer transaction "
<< _activeTxnNumber
<< " has already started.",
txnNumber >= _activeTxnNumber);
}
Session::TxnResources::TxnResources(OperationContext* opCtx) {
_ruState = opCtx->getWriteUnitOfWork()->release();
opCtx->setWriteUnitOfWork(nullptr);
_locker = opCtx->swapLockState(stdx::make_unique());
_locker->releaseTicket();
_locker->unsetThreadId();
// This thread must still respect the transaction lock timeout, since it can prevent the
// transaction from making progress.
auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load();
if (maxTransactionLockMillis >= 0) {
opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis));
}
_recoveryUnit = std::unique_ptr(opCtx->releaseRecoveryUnit());
opCtx->setRecoveryUnit(opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit(),
WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
_readConcernArgs = repl::ReadConcernArgs::get(opCtx);
}
Session::TxnResources::~TxnResources() {
if (!_released && _recoveryUnit) {
// This should only be reached when aborting a transaction that isn't active, i.e.
// when starting a new transaction before completing an old one. So we should
// be at WUOW nesting level 1 (only the top level WriteUnitOfWork).
_locker->endWriteUnitOfWork();
invariant(!_locker->inAWriteUnitOfWork());
_recoveryUnit->abortUnitOfWork();
}
}
void Session::TxnResources::release(OperationContext* opCtx) {
// Perform operations that can fail the release before marking the TxnResources as released.
_locker->reacquireTicket(opCtx);
invariant(!_released);
_released = true;
// We intentionally do not capture the return value of swapLockState(), which is just an empty
// locker. At the end of the operation, if the transaction is not complete, we will stash the
// operation context's locker and replace it with a new empty locker.
invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive);
opCtx->swapLockState(std::move(_locker));
opCtx->lockState()->updateThreadIdToCurrentThread();
opCtx->setRecoveryUnit(_recoveryUnit.release(),
WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(opCtx, _ruState));
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
readConcernArgs = _readConcernArgs;
}
Session::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx) : _opCtx(opCtx) {
if (_opCtx->getWriteUnitOfWork()) {
// This must be done under the client lock, since we are modifying '_opCtx'.
stdx::lock_guard clientLock(*_opCtx->getClient());
_txnResources = Session::TxnResources(_opCtx);
}
}
Session::SideTransactionBlock::~SideTransactionBlock() {
if (_txnResources) {
// Restore the transaction state onto '_opCtx'. This must be done under the
// client lock, since we are modifying '_opCtx'.
stdx::lock_guard clientLock(*_opCtx->getClient());
_txnResources->release(_opCtx);
}
}
void Session::stashTransactionResources(OperationContext* opCtx) {
if (opCtx->getClient()->isInDirectClient()) {
return;
}
invariant(opCtx->getTxnNumber());
// We must lock the Client to change the Locker on the OperationContext and the Session mutex to
// access Session state. We must lock the Client before the Session mutex, since the Client
// effectively owns the Session. That is, a user might lock the Client to ensure it doesn't go
// away, and then lock the Session owned by that client. We rely on the fact that we are not
// using the DefaultLockerImpl to avoid deadlock.
stdx::lock_guard lk(*opCtx->getClient());
stdx::unique_lock lg(_mutex);
// Always check '_activeTxnNumber', since it can be modified by migration, which does not
// check out the session. We intentionally do not error if _txnState=kAborted, since we
// expect this function to be called at the end of the 'abortTransaction' command.
_checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false);
if (!_txnState.inMultiDocumentTransaction(lg)) {
// Not in a multi-document transaction: nothing to do.
return;
}
if (_singleTransactionStats->isActive()) {
_singleTransactionStats->setInactive(curTimeMicros64());
}
// Add the latest operation stats to the aggregate OpDebug object stored in the
// SingleTransactionStats instance on the Session.
_singleTransactionStats->getOpDebug()->additiveMetrics.add(
CurOp::get(opCtx)->debug().additiveMetrics);
invariant(!_txnResourceStash);
_txnResourceStash = TxnResources(opCtx);
// We accept possible slight inaccuracies in these counters from non-atomicity.
ServerTransactionsMetrics::get(opCtx)->decrementCurrentActive();
ServerTransactionsMetrics::get(opCtx)->incrementCurrentInactive();
// Update the LastClientInfo object stored in the SingleTransactionStats instance on the Session
// with this Client's information. This is the last client that ran a transaction operation on
// the Session.
_singleTransactionStats->getLastClientInfo()->update(opCtx->getClient());
}
void Session::unstashTransactionResources(OperationContext* opCtx, const std::string& cmdName) {
if (opCtx->getClient()->isInDirectClient()) {
return;
}
invariant(opCtx->getTxnNumber());
{
// We must lock the Client to change the Locker on the OperationContext and the Session
// mutex to access Session state. We must lock the Client before the Session mutex, since
// the Client effectively owns the Session. That is, a user might lock the Client to ensure
// it doesn't go away, and then lock the Session owned by that client.
stdx::lock_guard lk(*opCtx->getClient());
stdx::lock_guard lg(_mutex);
// Always check '_activeTxnNumber' and '_txnState', since they can be modified by session
// kill and migration, which do not check out the session.
_checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false);
// If this is not a multi-document transaction, there is nothing to unstash.
if (_txnState.isNone(lg)) {
invariant(!_txnResourceStash);
return;
}
// Throw NoSuchTransaction error instead of TransactionAborted error since this is the entry
// point of transaction execution.
uassert(ErrorCodes::NoSuchTransaction,
str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.",
!_txnState.isAborted(lg));
// Cannot change committed transaction but allow retrying commitTransaction command.
uassert(ErrorCodes::TransactionCommitted,
str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been committed.",
cmdName == "commitTransaction" || !_txnState.isCommitted(lg));
if (_txnResourceStash) {
// Transaction resources already exist for this transaction. Transfer them from the
// stash to the operation context.
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
"Only the first command in a transaction may specify a readConcern",
readConcernArgs.isEmpty());
_txnResourceStash->release(opCtx);
_txnResourceStash = boost::none;
// Set the starting active time for this transaction.
if (_txnState.isInProgress(lk)) {
_singleTransactionStats->setActive(curTimeMicros64());
}
// We accept possible slight inaccuracies in these counters from non-atomicity.
ServerTransactionsMetrics::get(opCtx)->incrementCurrentActive();
ServerTransactionsMetrics::get(opCtx)->decrementCurrentInactive();
return;
}
// If we have no transaction resources then we cannot be prepared. If we're not in progress,
// we don't do anything else.
invariant(!_txnState.isPrepared(lk));
if (!_txnState.isInProgress(lg)) {
// At this point we're either committed and this is a 'commitTransaction' command, or we
// are in the process of committing.
return;
}
// Stashed transaction resources do not exist for this in-progress multi-document
// transaction. Set up the transaction resources on the opCtx.
opCtx->setWriteUnitOfWork(std::make_unique(opCtx));
ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementCurrentActive();
// Set the starting active time for this transaction.
_singleTransactionStats->setActive(curTimeMicros64());
// If maxTransactionLockRequestTimeoutMillis is set, then we will ensure no
// future lock request waits longer than maxTransactionLockRequestTimeoutMillis
// to acquire a lock. This is to avoid deadlocks and minimize non-transaction
// operation performance degradations.
auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load();
if (maxTransactionLockMillis >= 0) {
opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis));
}
}
// Storage engine transactions may be started in a lazy manner. By explicitly
// starting here we ensure that a point-in-time snapshot is established during the
// first operation of a transaction.
//
// Active transactions are protected by the locking subsystem, so we must always hold at least a
// Global intent lock before starting a transaction. We pessimistically acquire an intent
// exclusive lock here because we might be doing writes in this transaction, and it is currently
// not deadlock-safe to upgrade IS to IX.
Lock::GlobalLock(opCtx, MODE_IX);
opCtx->recoveryUnit()->preallocateSnapshot();
// The Client lock must not be held when executing this failpoint as it will block currentOp
// execution.
MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterPreallocateSnapshot);
}
Timestamp Session::prepareTransaction(OperationContext* opCtx) {
// This ScopeGuard is created outside of the lock so that the lock is always released before
// this is called.
ScopeGuard abortGuard = MakeGuard([&] { abortActiveTransaction(opCtx); });
stdx::unique_lock lk(_mutex);
// Always check '_activeTxnNumber' and '_txnState', since they can be modified by
// session kill and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
_txnState.transitionTo(lk, TransitionTable::State::kPrepared);
// We need to unlock the session to run the opObserver onTransactionPrepare, which calls back
// into the session.
lk.unlock();
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
opObserver->onTransactionPrepare(opCtx);
lk.lock();
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
// Ensure that the transaction is still prepared.
invariant(_txnState.isPrepared(lk), str::stream() << "Current state: " << _txnState);
opCtx->getWriteUnitOfWork()->prepare();
abortGuard.Dismiss();
// Return the prepareTimestamp from the recovery unit.
return opCtx->recoveryUnit()->getPrepareTimestamp();
}
void Session::abortArbitraryTransaction() {
stdx::lock_guard lock(_mutex);
_abortArbitraryTransaction(lock);
}
void Session::abortArbitraryTransactionIfExpired() {
stdx::lock_guard lock(_mutex);
if (!_transactionExpireDate || _transactionExpireDate >= Date_t::now()) {
return;
}
_abortArbitraryTransaction(lock);
}
void Session::_abortArbitraryTransaction(WithLock lock) {
if (!_txnState.isInProgress(lock)) {
// We do not want to abort transactions that are prepared unless we get an
// 'abortTransaction' command.
return;
}
_abortTransaction(lock);
}
void Session::abortActiveTransaction(OperationContext* opCtx) {
stdx::lock_guard lock(_mutex);
if (!_txnState.inMultiDocumentTransaction(lock)) {
return;
}
_abortTransaction(lock);
// Abort the WUOW. We should be able to abort empty transactions that don't have WUOW.
if (opCtx->getWriteUnitOfWork()) {
opCtx->setWriteUnitOfWork(nullptr);
}
// We must clear the recovery unit and locker so any post-transaction writes can run without
// transactional settings such as a read timestamp.
opCtx->setRecoveryUnit(opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit(),
WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
opCtx->lockState()->unsetMaxLockTimeout();
// Add the latest operation stats to the aggregate OpDebug object stored in the
// SingleTransactionStats instance on the Session.
_singleTransactionStats->getOpDebug()->additiveMetrics.add(
CurOp::get(opCtx)->debug().additiveMetrics);
// Update the LastClientInfo object stored in the SingleTransactionStats instance on the Session
// with this Client's information.
_singleTransactionStats->getLastClientInfo()->update(opCtx->getClient());
}
void Session::_abortTransaction(WithLock wl) {
// If the transaction is stashed, then we have aborted an inactive transaction.
if (_txnResourceStash) {
ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentInactive();
} else {
ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentActive();
}
_txnResourceStash = boost::none;
_transactionOperationBytes = 0;
_transactionOperations.clear();
_txnState.transitionTo(wl, TransitionTable::State::kAborted);
_speculativeTransactionReadOpTime = repl::OpTime();
ServerTransactionsMetrics::get(getGlobalServiceContext())->incrementTotalAborted();
if (!_txnState.isNone(wl)) {
_singleTransactionStats->setEndTime(curTimeMicros64());
// The transaction has aborted, so we mark it as inactive.
if (_singleTransactionStats->isActive()) {
_singleTransactionStats->setInactive(curTimeMicros64());
}
}
ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentOpen();
}
void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) {
_checkValid(wl);
_checkTxnValid(wl, txnNumber);
// Check for continuing an existing transaction
if (txnNumber == _activeTxnNumber)
return;
_setActiveTxn(wl, txnNumber);
}
void Session::_setActiveTxn(WithLock wl, TxnNumber txnNumber) {
// Abort the existing transaction if it's not prepared, committed, or aborted.
if (_txnState.isInProgress(wl)) {
_abortTransaction(wl);
}
_activeTxnNumber = txnNumber;
_activeTxnCommittedStatements.clear();
_hasIncompleteHistory = false;
_txnState.transitionTo(wl, TransitionTable::State::kNone);
_singleTransactionStats = boost::none;
_speculativeTransactionReadOpTime = repl::OpTime();
_multikeyPathInfo.clear();
}
void Session::addTransactionOperation(OperationContext* opCtx,
const repl::ReplOperation& operation) {
stdx::lock_guard lk(_mutex);
// Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill
// and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
// Ensure that we only ever add operations to an in progress transaction.
invariant(_txnState.isInProgress(lk), str::stream() << "Current state: " << _txnState);
invariant(!_autocommit && _activeTxnNumber != kUninitializedTxnNumber);
invariant(opCtx->lockState()->inAWriteUnitOfWork());
_transactionOperations.push_back(operation);
_transactionOperationBytes += repl::OplogEntry::getReplOperationSize(operation);
// _transactionOperationBytes is based on the in-memory size of the operation. With overhead,
// we expect the BSON size of the operation to be larger, so it's possible to make a transaction
// just a bit too large and have it fail only in the commit. It's still useful to fail early
// when possible (e.g. to avoid exhausting server memory).
uassert(ErrorCodes::TransactionTooLarge,
str::stream() << "Total size of all transaction operations must be less than "
<< BSONObjMaxInternalSize
<< ". Actual size is "
<< _transactionOperationBytes,
_transactionOperationBytes <= BSONObjMaxInternalSize);
}
std::vector Session::endTransactionAndRetrieveOperations(
OperationContext* opCtx) {
stdx::lock_guard lk(_mutex);
// Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill
// and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
// Ensure that we only ever end a transaction when prepared or committing.
invariant(_txnState.isPrepared(lk) || _txnState.isCommittingWithoutPrepare(lk),
str::stream() << "Current state: " << _txnState);
invariant(!_autocommit);
_transactionOperationBytes = 0;
return std::move(_transactionOperations);
}
void Session::commitTransaction(OperationContext* opCtx,
boost::optional commitTimestamp) {
stdx::unique_lock lk(_mutex);
// Always check '_activeTxnNumber' and '_txnState', since they can be modified by session kill
// and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
_commitTransaction(std::move(lk), opCtx, commitTimestamp);
}
void Session::_commitTransaction(stdx::unique_lock lk,
OperationContext* opCtx,
boost::optional commitTimestamp) {
if (_txnState.isPrepared(lk)) {
uassert(ErrorCodes::InvalidOptions,
"commitTransaction on a prepared transaction expects a 'commitTimestamp'",
commitTimestamp);
uassert(ErrorCodes::InvalidOptions,
"'commitTimestamp' cannot be null",
!commitTimestamp->isNull());
_txnState.transitionTo(lk, TransitionTable::State::kCommittingWithPrepare);
opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp.get());
} else {
uassert(ErrorCodes::InvalidOptions,
"commitTransaction on a non-prepared transaction cannot have a 'commitTimestamp'",
!commitTimestamp);
_txnState.transitionTo(lk, TransitionTable::State::kCommittingWithoutPrepare);
}
// We need to unlock the session to run the opObserver onTransactionCommit, which calls back
// into the session.
lk.unlock();
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
const bool wasPrepared = commitTimestamp != boost::none;
opObserver->onTransactionCommit(opCtx, wasPrepared);
lk.lock();
// Always check '_activeTxnNumber' and '_txnState', since they can be modified by session
// kill and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
bool committed = false;
ON_BLOCK_EXIT([this, &committed, opCtx]() {
// If we're still "committing", the recovery unit failed to commit, and the lock is not
// held. We can't safely use _txnState here, as it is protected by the lock.
if (!committed) {
stdx::lock_guard lk(_mutex);
opCtx->setWriteUnitOfWork(nullptr);
// Make sure the transaction didn't change because of chunk migration.
if (opCtx->getTxnNumber() == _activeTxnNumber) {
_txnState.transitionTo(lk, TransitionTable::State::kAborted);
ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentActive();
// After the transaction has been aborted, we must update the end time and mark it
// as inactive.
auto curTime = curTimeMicros64();
_singleTransactionStats->setEndTime(curTime);
if (_singleTransactionStats->isActive()) {
_singleTransactionStats->setInactive(curTime);
}
ServerTransactionsMetrics::get(opCtx)->incrementTotalAborted();
ServerTransactionsMetrics::get(opCtx)->decrementCurrentOpen();
// Add the latest operation stats to the aggregate OpDebug object stored in the
// SingleTransactionStats instance on the Session.
_singleTransactionStats->getOpDebug()->additiveMetrics.add(
CurOp::get(opCtx)->debug().additiveMetrics);
// Update the LastClientInfo object stored in the SingleTransactionStats instance on
// the Session with this Client's information.
_singleTransactionStats->getLastClientInfo()->update(opCtx->getClient());
}
}
// We must clear the recovery unit and locker so any post-transaction writes can run without
// transactional settings such as a read timestamp.
opCtx->setRecoveryUnit(opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit(),
WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork);
opCtx->lockState()->unsetMaxLockTimeout();
});
lk.unlock();
opCtx->getWriteUnitOfWork()->commit();
opCtx->setWriteUnitOfWork(nullptr);
committed = true;
lk.lock();
auto& clientInfo = repl::ReplClientInfo::forClient(opCtx->getClient());
// If no writes have been done, set the client optime forward to the read timestamp so waiting
// for write concern will ensure all read data was committed.
//
// TODO(SERVER-34881): Once the default read concern is speculative majority, only set the
// client optime forward if the original read concern level is "majority" or "snapshot".
if (_speculativeTransactionReadOpTime > clientInfo.getLastOp()) {
clientInfo.setLastOp(_speculativeTransactionReadOpTime);
}
_txnState.transitionTo(lk, TransitionTable::State::kCommitted);
ServerTransactionsMetrics::get(opCtx)->incrementTotalCommitted();
// After the transaction has been committed, we must update the end time and mark it as
// inactive.
_singleTransactionStats->setEndTime(curTimeMicros64());
if (_singleTransactionStats->isActive()) {
_singleTransactionStats->setInactive(curTimeMicros64());
}
ServerTransactionsMetrics::get(opCtx)->decrementCurrentOpen();
ServerTransactionsMetrics::get(getGlobalServiceContext())->decrementCurrentActive();
// Add the latest operation stats to the aggregate OpDebug object stored in the
// SingleTransactionStats instance on the Session.
_singleTransactionStats->getOpDebug()->additiveMetrics.add(
CurOp::get(opCtx)->debug().additiveMetrics);
// Update the LastClientInfo object stored in the SingleTransactionStats instance on the Session
// with this Client's information.
_singleTransactionStats->getLastClientInfo()->update(opCtx->getClient());
}
BSONObj Session::reportStashedState() const {
BSONObjBuilder builder;
reportStashedState(&builder);
return builder.obj();
}
void Session::reportStashedState(BSONObjBuilder* builder) const {
stdx::lock_guard ls(_mutex);
if (_txnResourceStash && _txnResourceStash->locker()) {
if (auto lockerInfo = _txnResourceStash->locker()->getLockerInfo()) {
invariant(_activeTxnNumber != kUninitializedTxnNumber);
builder->append("host", getHostNameCachedAndPort());
builder->append("desc", "inactive transaction");
{
BSONObjBuilder lsid(builder->subobjStart("lsid"));
getSessionId().serialize(&lsid);
}
BSONObjBuilder transactionBuilder;
_reportTransactionStats(ls, &transactionBuilder);
builder->append("transaction", transactionBuilder.obj());
builder->append("waitingForLock", false);
builder->append("active", false);
fillLockerInfo(*lockerInfo, *builder);
}
}
}
void Session::reportUnstashedState(BSONObjBuilder* builder) const {
stdx::lock_guard ls(_mutex);
if (!_txnResourceStash) {
BSONObjBuilder transactionBuilder;
_reportTransactionStats(ls, &transactionBuilder);
builder->append("transaction", transactionBuilder.obj());
}
}
void Session::_reportTransactionStats(WithLock wl, BSONObjBuilder* builder) const {
BSONObjBuilder parametersBuilder(builder->subobjStart("parameters"));
parametersBuilder.append("txnNumber", _activeTxnNumber);
if (!_txnState.inMultiDocumentTransaction(wl)) {
// For retryable writes, we only include the txnNumber.
parametersBuilder.done();
return;
}
parametersBuilder.append("autocommit", _autocommit);
parametersBuilder.done();
builder->append("startWallClockTime",
dateToISOStringLocal(Date_t::fromMillisSinceEpoch(
_singleTransactionStats->getStartTime() / 1000)));
// We use the same "now" time so that the following time metrics are consistent with each other.
auto curTime = curTimeMicros64();
builder->append("timeOpenMicros",
static_cast(_singleTransactionStats->getDuration(curTime)));
auto timeActive =
durationCount(_singleTransactionStats->getTimeActiveMicros(curTime));
auto timeInactive =
durationCount(_singleTransactionStats->getTimeInactiveMicros(curTime));
builder->append("timeActiveMicros", timeActive);
builder->append("timeInactiveMicros", timeInactive);
}
void Session::_checkValid(WithLock) const {
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Session " << getSessionId()
<< " was concurrently modified and the operation must be retried.",
_isValid);
}
void Session::_checkIsActiveTransaction(WithLock wl, TxnNumber txnNumber, bool checkAbort) const {
uassert(ErrorCodes::ConflictingOperationInProgress,
str::stream() << "Cannot perform operations on transaction " << txnNumber
<< " on session "
<< getSessionId()
<< " because a different transaction "
<< _activeTxnNumber
<< " is now active.",
txnNumber == _activeTxnNumber);
uassert(ErrorCodes::NoSuchTransaction,
str::stream() << "Transaction " << txnNumber << " has been aborted.",
!checkAbort || !_txnState.isAborted(wl));
}
boost::optional Session::_checkStatementExecuted(WithLock wl,
TxnNumber txnNumber,
StmtId stmtId) const {
_checkValid(wl);
_checkIsActiveTransaction(wl, txnNumber, false);
// Retries are not detected for multi-document transactions.
if (!_txnState.isNone(wl))
return boost::none;
const auto it = _activeTxnCommittedStatements.find(stmtId);
if (it == _activeTxnCommittedStatements.end()) {
uassert(ErrorCodes::IncompleteTransactionHistory,
str::stream() << "Incomplete history detected for transaction " << txnNumber
<< " on session "
<< _sessionId.toBSON(),
!_hasIncompleteHistory);
return boost::none;
}
invariant(_lastWrittenSessionRecord);
invariant(_lastWrittenSessionRecord->getTxnNum() == txnNumber);
return it->second;
}
Date_t Session::_getLastWriteDate(WithLock wl, TxnNumber txnNumber) const {
_checkValid(wl);
_checkIsActiveTransaction(wl, txnNumber, false);
if (!_lastWrittenSessionRecord || _lastWrittenSessionRecord->getTxnNum() != txnNumber)
return {};
return _lastWrittenSessionRecord->getLastWriteDate();
}
UpdateRequest Session::_makeUpdateRequest(WithLock,
TxnNumber newTxnNumber,
const repl::OpTime& newLastWriteOpTime,
Date_t newLastWriteDate) const {
UpdateRequest updateRequest(NamespaceString::kSessionTransactionsTableNamespace);
const auto updateBSON = [&] {
SessionTxnRecord newTxnRecord;
newTxnRecord.setSessionId(_sessionId);
newTxnRecord.setTxnNum(newTxnNumber);
newTxnRecord.setLastWriteOpTime(newLastWriteOpTime);
newTxnRecord.setLastWriteDate(newLastWriteDate);
return newTxnRecord.toBSON();
}();
updateRequest.setUpdates(updateBSON);
updateRequest.setQuery(BSON(SessionTxnRecord::kSessionIdFieldName << _sessionId.toBSON()));
updateRequest.setUpsert(true);
return updateRequest;
}
void Session::_registerUpdateCacheOnCommit(OperationContext* opCtx,
TxnNumber newTxnNumber,
std::vector stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteOpTime) {
opCtx->recoveryUnit()->onCommit(
[ this, newTxnNumber, stmtIdsWritten = std::move(stmtIdsWritten), lastStmtIdWriteOpTime ](
boost::optional) {
RetryableWritesStats::get(getGlobalServiceContext())
->incrementTransactionsCollectionWriteCount();
stdx::lock_guard lg(_mutex);
if (!_isValid)
return;
// The cache of the last written record must always be advanced after a write so that
// subsequent writes have the correct point to start from.
if (!_lastWrittenSessionRecord) {
_lastWrittenSessionRecord.emplace();
_lastWrittenSessionRecord->setSessionId(_sessionId);
_lastWrittenSessionRecord->setTxnNum(newTxnNumber);
_lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
} else {
if (newTxnNumber > _lastWrittenSessionRecord->getTxnNum())
_lastWrittenSessionRecord->setTxnNum(newTxnNumber);
if (lastStmtIdWriteOpTime > _lastWrittenSessionRecord->getLastWriteOpTime())
_lastWrittenSessionRecord->setLastWriteOpTime(lastStmtIdWriteOpTime);
}
if (newTxnNumber > _activeTxnNumber) {
// This call is necessary in order to advance the txn number and reset the cached
// state in the case where just before the storage transaction commits, the cache
// entry gets invalidated and immediately refreshed while there were no writes for
// newTxnNumber yet. In this case _activeTxnNumber will be less than newTxnNumber
// and we will fail to update the cache even though the write was successful.
_beginOrContinueTxn(lg, newTxnNumber, boost::none, boost::none);
}
if (newTxnNumber == _activeTxnNumber) {
for (const auto stmtId : stmtIdsWritten) {
if (stmtId == kIncompleteHistoryStmtId) {
_hasIncompleteHistory = true;
continue;
}
const auto insertRes =
_activeTxnCommittedStatements.emplace(stmtId, lastStmtIdWriteOpTime);
if (!insertRes.second) {
const auto& existingOpTime = insertRes.first->second;
fassertOnRepeatedExecution(_sessionId,
newTxnNumber,
stmtId,
existingOpTime,
lastStmtIdWriteOpTime);
}
}
}
});
MONGO_FAIL_POINT_BLOCK(onPrimaryTransactionalWrite, customArgs) {
const auto& data = customArgs.getData();
const auto closeConnectionElem = data["closeConnection"];
if (closeConnectionElem.eoo() || closeConnectionElem.Bool()) {
opCtx->getClient()->session()->end();
}
const auto failBeforeCommitExceptionElem = data["failBeforeCommitExceptionCode"];
if (!failBeforeCommitExceptionElem.eoo()) {
const auto failureCode = ErrorCodes::Error(int(failBeforeCommitExceptionElem.Number()));
uasserted(failureCode,
str::stream() << "Failing write for " << _sessionId << ":" << newTxnNumber
<< " due to failpoint. The write must not be reflected.");
}
}
}
boost::optional Session::createMatchingTransactionTableUpdate(
const repl::OplogEntry& entry) {
auto sessionInfo = entry.getOperationSessionInfo();
if (!sessionInfo.getTxnNumber()) {
return boost::none;
}
invariant(sessionInfo.getSessionId());
invariant(entry.getWallClockTime());
const auto updateBSON = [&] {
SessionTxnRecord newTxnRecord;
newTxnRecord.setSessionId(*sessionInfo.getSessionId());
newTxnRecord.setTxnNum(*sessionInfo.getTxnNumber());
newTxnRecord.setLastWriteOpTime(entry.getOpTime());
newTxnRecord.setLastWriteDate(*entry.getWallClockTime());
return newTxnRecord.toBSON();
}();
return repl::OplogEntry(
entry.getOpTime(),
0, // hash
repl::OpTypeEnum::kUpdate,
NamespaceString::kSessionTransactionsTableNamespace,
boost::none, // uuid
false, // fromMigrate
repl::OplogEntry::kOplogVersion,
updateBSON,
BSON(SessionTxnRecord::kSessionIdFieldName << sessionInfo.getSessionId()->toBSON()),
{}, // sessionInfo
true, // upsert
*entry.getWallClockTime(),
boost::none, // statementId
boost::none, // prevWriteOpTime
boost::none, // preImangeOpTime
boost::none // postImageOpTime
);
}
std::string Session::TransitionTable::toString(State state) {
switch (state) {
case Session::TransitionTable::State::kNone:
return "TxnState::None";
case Session::TransitionTable::State::kInProgress:
return "TxnState::InProgress";
case Session::TransitionTable::State::kPrepared:
return "TxnState::Prepared";
case Session::TransitionTable::State::kCommittingWithoutPrepare:
return "TxnState::CommittingWithoutPrepare";
case Session::TransitionTable::State::kCommittingWithPrepare:
return "TxnState::CommittingWithPrepare";
case Session::TransitionTable::State::kCommitted:
return "TxnState::Committed";
case Session::TransitionTable::State::kAborted:
return "TxnState::Aborted";
}
MONGO_UNREACHABLE;
}
bool Session::TransitionTable::_isLegalTransition(State oldState, State newState) {
switch (oldState) {
case State::kNone:
switch (newState) {
case State::kNone:
case State::kInProgress:
return true;
default:
return false;
}
MONGO_UNREACHABLE;
case State::kInProgress:
switch (newState) {
case State::kNone:
case State::kPrepared:
case State::kCommittingWithoutPrepare:
case State::kAborted:
return true;
default:
return false;
}
MONGO_UNREACHABLE;
case State::kPrepared:
switch (newState) {
case State::kCommittingWithPrepare:
case State::kAborted:
return true;
default:
return false;
}
MONGO_UNREACHABLE;
case State::kCommittingWithPrepare:
case State::kCommittingWithoutPrepare:
switch (newState) {
case State::kNone:
case State::kCommitted:
case State::kAborted:
return true;
default:
return false;
}
MONGO_UNREACHABLE;
case State::kCommitted:
switch (newState) {
case State::kNone:
case State::kInProgress:
return true;
default:
return false;
}
MONGO_UNREACHABLE;
case State::kAborted:
switch (newState) {
case State::kNone:
case State::kInProgress:
return true;
default:
return false;
}
MONGO_UNREACHABLE;
}
MONGO_UNREACHABLE;
}
void Session::TransitionTable::transitionTo(WithLock,
State newState,
TransitionValidation shouldValidate) {
if (shouldValidate == TransitionValidation::kValidateTransition) {
invariant(TransitionTable::_isLegalTransition(_state, newState),
str::stream() << "Current state: " << toString(_state)
<< ", Illegal attempted next state: "
<< toString(newState));
}
_state = newState;
}
} // namespace mongo