/** * Copyright (C) 2018 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kStorage #include "mongo/platform/basic.h" #include "mongo/db/transaction_participant.h" #include "mongo/db/commands/test_commands_enabled.h" #include "mongo/db/concurrency/d_concurrency.h" #include "mongo/db/concurrency/lock_state.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/curop_failpoint_helpers.h" #include "mongo/db/op_observer.h" #include "mongo/db/repl/repl_client_info.h" #include "mongo/db/server_parameters.h" #include "mongo/db/server_transactions_metrics.h" #include "mongo/db/session.h" #include "mongo/db/session_catalog.h" #include "mongo/db/stats/fill_locker_info.h" #include "mongo/util/fail_point_service.h" #include "mongo/util/log.h" #include "mongo/util/net/socket_utils.h" namespace mongo { // Server parameter that dictates the max number of milliseconds that any transaction lock request // will wait for lock acquisition. If an operation provides a greater timeout in a lock request, // maxTransactionLockRequestTimeoutMillis will override it. If this is set to a negative value, it // is inactive and nothing will be overridden. // // 5 milliseconds will help avoid deadlocks, but will still allow fast-running metadata operations // to run without aborting transactions. MONGO_EXPORT_SERVER_PARAMETER(maxTransactionLockRequestTimeoutMillis, int, 5); // Server parameter that dictates the lifetime given to each transaction. // Transactions must eventually expire to preempt storage cache pressure immobilizing the system. MONGO_EXPORT_SERVER_PARAMETER(transactionLifetimeLimitSeconds, std::int32_t, 60) ->withValidator([](const auto& potentialNewValue) { if (potentialNewValue < 1) { return Status(ErrorCodes::BadValue, "transactionLifetimeLimitSeconds must be greater than or equal to 1s"); } return Status::OK(); }); namespace { // Failpoint which will pause an operation just after allocating a point-in-time storage engine // transaction. MONGO_FAIL_POINT_DEFINE(hangAfterPreallocateSnapshot); MONGO_FAIL_POINT_DEFINE(hangAfterReservingPrepareTimestamp); const auto getTransactionParticipant = Session::declareDecoration(); // The command names that are allowed in a multi-document transaction. const StringMap txnCmdWhitelist = {{"abortTransaction", 1}, {"aggregate", 1}, {"commitTransaction", 1}, {"coordinateCommitTransaction", 1}, {"delete", 1}, {"distinct", 1}, {"doTxn", 1}, {"find", 1}, {"findandmodify", 1}, {"findAndModify", 1}, {"geoSearch", 1}, {"getMore", 1}, {"insert", 1}, {"killCursors", 1}, {"prepareTransaction", 1}, {"update", 1}, {"voteAbortTransaction", 1}, {"voteCommitTransaction", 1}}; // The command names that are allowed in a multi-document transaction only when test commands are // enabled. const StringMap txnCmdForTestingWhitelist = {{"dbHash", 1}}; // The commands that can be run on the 'admin' database in multi-document transactions. const StringMap txnAdminCommands = {{"abortTransaction", 1}, {"commitTransaction", 1}, {"coordinateCommitTransaction", 1}, {"doTxn", 1}, {"prepareTransaction", 1}, {"voteAbortTransaction", 1}, {"voteCommitTransaction", 1}}; // The command names that are allowed in a prepared transaction. const StringMap preparedTxnCmdWhitelist = { {"abortTransaction", 1}, {"commitTransaction", 1}, {"prepareTransaction", 1}}; } // unnamed namespace TransactionParticipant* TransactionParticipant::get(OperationContext* opCtx) { auto session = OperationContextSession::get(opCtx); if (!session) { return nullptr; } return &getTransactionParticipant(session); } TransactionParticipant* TransactionParticipant::getFromNonCheckedOutSession(Session* session) { return &getTransactionParticipant(session); } const Session* TransactionParticipant::_getSession() const { return getTransactionParticipant.owner(this); } Session* TransactionParticipant::_getSession() { return getTransactionParticipant.owner(this); } void TransactionParticipant::_beginOrContinueRetryableWrite(WithLock wl, TxnNumber txnNumber) { if (txnNumber > _activeTxnNumber) { // New retryable write. _setNewTxnNumber(wl, txnNumber); _autoCommit = boost::none; } else { // Retrying a retryable write. uassert(ErrorCodes::InvalidOptions, "Must specify autocommit=false on all operations of a multi-statement transaction.", _txnState.isNone(wl)); invariant(_autoCommit == boost::none); } } void TransactionParticipant::_continueMultiDocumentTransaction(WithLock wl, TxnNumber txnNumber) { uassert(ErrorCodes::NoSuchTransaction, str::stream() << "Given transaction number " << txnNumber << " does not match any in-progress transactions. The active transaction number is " << _activeTxnNumber, txnNumber == _activeTxnNumber && !_txnState.isNone(wl)); if (_txnState.isInProgress(wl) && !_txnResourceStash) { // This indicates that the first command in the transaction failed but did not // implicitly abort the transaction. It is not safe to continue the transaction, in // particular because we have not saved the readConcern from the first statement of // the transaction. _abortTransactionOnSession(wl); uasserted(ErrorCodes::NoSuchTransaction, str::stream() << "Transaction " << txnNumber << " has been aborted."); } return; } void TransactionParticipant::_beginMultiDocumentTransaction(WithLock wl, TxnNumber txnNumber) { // Aborts any in-progress txns. _setNewTxnNumber(wl, txnNumber); _autoCommit = false; _txnState.transitionTo(wl, TransactionState::kInProgress); // Start tracking various transactions metrics. auto curTime = curTimeMicros64(); _transactionExpireDate = Date_t::fromMillisSinceEpoch(curTime / 1000) + stdx::chrono::seconds{transactionLifetimeLimitSeconds.load()}; { stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onStart( ServerTransactionsMetrics::get(getGlobalServiceContext()), *_autoCommit, curTime, *_transactionExpireDate); } invariant(_transactionOperations.empty()); } void TransactionParticipant::beginOrContinue(TxnNumber txnNumber, boost::optional autocommit, boost::optional startTransaction) { stdx::lock_guard lg(_mutex); if (auto newState = _getSession()->getLastRefreshState()) { _updateState(lg, *newState); } // Requests without an autocommit field are interpreted as retryable writes. They cannot specify // startTransaction, which is verified earlier when parsing the request. if (!autocommit) { invariant(!startTransaction); _beginOrContinueRetryableWrite(lg, txnNumber); return; } // Attempt to continue a multi-statement transaction. In this case, it is required that // autocommit be given as an argument on the request, and currently it can only be false, which // is verified earlier when parsing the request. invariant(*autocommit == false); if (!startTransaction) { _continueMultiDocumentTransaction(lg, txnNumber); return; } // Attempt to start a multi-statement transaction, which requires startTransaction be given as // an argument on the request. startTransaction can only be specified as true, which is verified // earlier when parsing the request. invariant(*startTransaction); // Servers in a sharded cluster can start a new transaction at the active transaction number to // allow internal retries by routers on re-targeting errors, like StaleShardVersion or // SnapshotTooOld. if (txnNumber == _activeTxnNumber) { uassert(ErrorCodes::ConflictingOperationInProgress, "Only servers in a sharded cluster can start a new transaction at the active " "transaction number", serverGlobalParams.clusterRole != ClusterRole::None); // The active transaction number can only be reused if the transaction is not in a state // that indicates it has been involved in a two phase commit. In normal operation this check // should never fail. // // TODO SERVER-36639: Ensure the active transaction number cannot be reused if the // transaction is in the abort after prepare state (or any state indicating the participant // has been involved in a two phase commit). const auto restartableStates = TransactionState::kInProgress | TransactionState::kAborted; uassert(50911, str::stream() << "Cannot start a transaction at given transaction number " << txnNumber << " a transaction with the same number is in state " << _txnState.toString(), _txnState.isInSet(lg, restartableStates)); } _beginMultiDocumentTransaction(lg, txnNumber); } void TransactionParticipant::beginOrContinueTransactionUnconditionally(TxnNumber txnNumber) { stdx::lock_guard lg(_mutex); // Continuing transaction unconditionally is a no-op since we don't check any on-disk state. if (_activeTxnNumber != txnNumber) { _beginMultiDocumentTransaction(lg, txnNumber); } } void TransactionParticipant::setSpeculativeTransactionOpTime( OperationContext* opCtx, SpeculativeTransactionOpTime opTimeChoice) { stdx::lock_guard lg(_mutex); repl::ReplicationCoordinator* replCoord = repl::ReplicationCoordinator::get(opCtx->getClient()->getServiceContext()); opCtx->recoveryUnit()->setTimestampReadSource( opTimeChoice == SpeculativeTransactionOpTime::kAllCommitted ? RecoveryUnit::ReadSource::kAllCommittedSnapshot : RecoveryUnit::ReadSource::kLastAppliedSnapshot); opCtx->recoveryUnit()->preallocateSnapshot(); auto readTimestamp = opCtx->recoveryUnit()->getPointInTimeReadTimestamp(); invariant(readTimestamp); // Transactions do not survive term changes, so combining "getTerm" here with the // recovery unit timestamp does not cause races. _speculativeTransactionReadOpTime = {*readTimestamp, replCoord->getTerm()}; stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onChooseReadTimestamp(*readTimestamp); } TransactionParticipant::OplogSlotReserver::OplogSlotReserver(OperationContext* opCtx) { // Stash the transaction on the OperationContext on the stack. At the end of this function it // will be unstashed onto the OperationContext. TransactionParticipant::SideTransactionBlock sideTxn(opCtx); // Begin a new WUOW and reserve a slot in the oplog. WriteUnitOfWork wuow(opCtx); _oplogSlot = repl::getNextOpTime(opCtx); // Release the WUOW state since this WUOW is no longer in use. wuow.release(); // We must lock the Client to change the Locker on the OperationContext. stdx::lock_guard lk(*opCtx->getClient()); // The new transaction should have an empty locker, and thus we do not need to save it. invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive); _locker = opCtx->swapLockState(stdx::make_unique()); // Inherit the locking setting from the original one. opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication( _locker->shouldConflictWithSecondaryBatchApplication()); _locker->unsetThreadId(); // OplogSlotReserver is only used by primary, so always set max transaction lock timeout. invariant(opCtx->writesAreReplicated()); // This thread must still respect the transaction lock timeout, since it can prevent the // transaction from making progress. auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load(); if (maxTransactionLockMillis >= 0) { opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis)); } // Save the RecoveryUnit from the new transaction and replace it with an empty one. _recoveryUnit = opCtx->releaseRecoveryUnit(); opCtx->setRecoveryUnit(std::unique_ptr( opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()), WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); } TransactionParticipant::OplogSlotReserver::~OplogSlotReserver() { // If the constructor did not complete, we do not attempt to abort the units of work. if (_recoveryUnit) { // We should be at WUOW nesting level 1, only the top level WUOW for the oplog reservation // side transaction. _locker->endWriteUnitOfWork(); invariant(!_locker->inAWriteUnitOfWork()); _recoveryUnit->abortUnitOfWork(); } } TransactionParticipant::TxnResources::TxnResources(OperationContext* opCtx, bool keepTicket) { // We must lock the Client to change the Locker on the OperationContext. stdx::lock_guard lk(*opCtx->getClient()); _ruState = opCtx->getWriteUnitOfWork()->release(); opCtx->setWriteUnitOfWork(nullptr); _locker = opCtx->swapLockState(stdx::make_unique()); // Inherit the locking setting from the original one. opCtx->lockState()->setShouldConflictWithSecondaryBatchApplication( _locker->shouldConflictWithSecondaryBatchApplication()); if (!keepTicket) { _locker->releaseTicket(); } _locker->unsetThreadId(); // This thread must still respect the transaction lock timeout, since it can prevent the // transaction from making progress. auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load(); if (opCtx->writesAreReplicated() && maxTransactionLockMillis >= 0) { opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis)); } // On secondaries, max lock timeout must not be set. invariant(opCtx->writesAreReplicated() || !opCtx->lockState()->hasMaxLockTimeout()); _recoveryUnit = opCtx->releaseRecoveryUnit(); opCtx->setRecoveryUnit(std::unique_ptr( opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()), WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); _readConcernArgs = repl::ReadConcernArgs::get(opCtx); } TransactionParticipant::TxnResources::~TxnResources() { if (!_released && _recoveryUnit) { // This should only be reached when aborting a transaction that isn't active, i.e. // when starting a new transaction before completing an old one. So we should // be at WUOW nesting level 1 (only the top level WriteUnitOfWork). _locker->endWriteUnitOfWork(); invariant(!_locker->inAWriteUnitOfWork()); _recoveryUnit->abortUnitOfWork(); } } void TransactionParticipant::TxnResources::release(OperationContext* opCtx) { // Perform operations that can fail the release before marking the TxnResources as released. _locker->reacquireTicket(opCtx); invariant(!_released); _released = true; // We intentionally do not capture the return value of swapLockState(), which is just an empty // locker. At the end of the operation, if the transaction is not complete, we will stash the // operation context's locker and replace it with a new empty locker. // It is necessary to lock the client to change the Locker on the OperationContext. stdx::lock_guard lk(*opCtx->getClient()); invariant(opCtx->lockState()->getClientState() == Locker::ClientState::kInactive); opCtx->swapLockState(std::move(_locker)); opCtx->lockState()->updateThreadIdToCurrentThread(); auto oldState = opCtx->setRecoveryUnit(std::move(_recoveryUnit), WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); invariant(oldState == WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork, str::stream() << "RecoveryUnit state was " << oldState); opCtx->setWriteUnitOfWork(WriteUnitOfWork::createForSnapshotResume(opCtx, _ruState)); auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); readConcernArgs = _readConcernArgs; } TransactionParticipant::SideTransactionBlock::SideTransactionBlock(OperationContext* opCtx) : _opCtx(opCtx) { if (_opCtx->getWriteUnitOfWork()) { _txnResources = TransactionParticipant::TxnResources(_opCtx, true /* keepTicket*/); } } TransactionParticipant::SideTransactionBlock::~SideTransactionBlock() { if (_txnResources) { // Restore the transaction state onto '_opCtx'. _txnResources->release(_opCtx); } } void TransactionParticipant::_stashActiveTransaction(WithLock, OperationContext* opCtx) { if (_inShutdown) { return; } invariant(_activeTxnNumber == opCtx->getTxnNumber()); { stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onStash(ServerTransactionsMetrics::get(opCtx), curTimeMicros64()); _transactionMetricsObserver.onTransactionOperation( opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics); } invariant(!_txnResourceStash); _txnResourceStash = TxnResources(opCtx); } void TransactionParticipant::stashTransactionResources(OperationContext* opCtx) { if (opCtx->getClient()->isInDirectClient()) { return; } invariant(opCtx->getTxnNumber()); stdx::unique_lock lg(_mutex); // Always check session's txnNumber, since it can be modified by migration, which does not // check out the session. We intentionally do not error if _txnState=kAborted, since we // expect this function to be called at the end of the 'abortTransaction' command. _checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false); if (!_txnState.inMultiDocumentTransaction(lg)) { // Not in a multi-document transaction: nothing to do. return; } _stashActiveTransaction(lg, opCtx); } void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx, const std::string& cmdName) { if (opCtx->getClient()->isInDirectClient()) { return; } invariant(opCtx->getTxnNumber()); { stdx::lock_guard lg(_mutex); // Always check session's txnNumber and '_txnState', since they can be modified by session // kill and migration, which do not check out the session. _checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false); // If this is not a multi-document transaction, there is nothing to unstash. if (_txnState.isNone(lg)) { invariant(!_txnResourceStash); return; } _checkIsCommandValidWithTxnState(lg, opCtx, cmdName); if (_txnResourceStash) { // Transaction resources already exist for this transaction. Transfer them from the // stash to the operation context. auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); uassert(ErrorCodes::InvalidOptions, "Only the first command in a transaction may specify a readConcern", readConcernArgs.isEmpty()); _txnResourceStash->release(opCtx); _txnResourceStash = boost::none; stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx), curTimeMicros64()); return; } // If we have no transaction resources then we cannot be prepared. If we're not in progress, // we don't do anything else. invariant(!_txnState.isPrepared(lg)); if (!_txnState.isInProgress(lg)) { // At this point we're either committed and this is a 'commitTransaction' command, or we // are in the process of committing. return; } // Stashed transaction resources do not exist for this in-progress multi-document // transaction. Set up the transaction resources on the opCtx. opCtx->setWriteUnitOfWork(std::make_unique(opCtx)); // If maxTransactionLockRequestTimeoutMillis is set, then we will ensure no // future lock request waits longer than maxTransactionLockRequestTimeoutMillis // to acquire a lock. This is to avoid deadlocks and minimize non-transaction // operation performance degradations. auto maxTransactionLockMillis = maxTransactionLockRequestTimeoutMillis.load(); if (opCtx->writesAreReplicated() && maxTransactionLockMillis >= 0) { opCtx->lockState()->setMaxLockTimeout(Milliseconds(maxTransactionLockMillis)); } // On secondaries, max lock timeout must not be set. invariant(opCtx->writesAreReplicated() || !opCtx->lockState()->hasMaxLockTimeout()); stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onUnstash(ServerTransactionsMetrics::get(opCtx), curTimeMicros64()); } // Storage engine transactions may be started in a lazy manner. By explicitly // starting here we ensure that a point-in-time snapshot is established during the // first operation of a transaction. // // Active transactions are protected by the locking subsystem, so we must always hold at least a // Global intent lock before starting a transaction. We pessimistically acquire an intent // exclusive lock here because we might be doing writes in this transaction, and it is currently // not deadlock-safe to upgrade IS to IX. Lock::GlobalLock(opCtx, MODE_IX); opCtx->recoveryUnit()->preallocateSnapshot(); // The Client lock must not be held when executing this failpoint as it will block currentOp // execution. if (MONGO_FAIL_POINT(hangAfterPreallocateSnapshot)) { CurOpFailpointHelpers::waitWhileFailPointEnabled( &hangAfterPreallocateSnapshot, opCtx, "hangAfterPreallocateSnapshot"); } } Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, boost::optional prepareOptime) { stdx::unique_lock lk(_mutex); // Always check session's txnNumber and '_txnState', since they can be modified by // session kill and migration, which do not check out the session. _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); _getSession()->lockTxnNumber( _activeTxnNumber, {ErrorCodes::PreparedTransactionInProgress, "cannot change transaction number while the session has a prepared transaction"}); ScopeGuard abortGuard = MakeGuard([&] { // Prepare transaction on secondaries should always succeed. invariant(!prepareOptime); if (lk.owns_lock()) { lk.unlock(); } abortActiveTransaction(opCtx); }); _txnState.transitionTo(lk, TransactionState::kPrepared); boost::optional oplogSlotReserver; OplogSlot prepareOplogSlot; if (prepareOptime) { // On secondary, we just prepare the transaction and discard the buffered ops. prepareOplogSlot = OplogSlot(*prepareOptime, 0); } else { // On primary, we reserve an optime, prepare the transaction and write the oplog entry. // // Reserve an optime for the 'prepareTimestamp'. This will create a hole in the oplog and // cause 'snapshot' and 'afterClusterTime' readers to block until this transaction is done // being prepared. When the OplogSlotReserver goes out of scope and is destroyed, the // storage-transaction it uses to keep the hole open will abort and the slot (and // corresponding oplog hole) will vanish. oplogSlotReserver.emplace(opCtx); prepareOplogSlot = oplogSlotReserver->getReservedOplogSlot(); invariant(_prepareOpTime.isNull(), str::stream() << "This transaction has already reserved a prepareOpTime at: " << _prepareOpTime.toString()); _prepareOpTime = prepareOplogSlot.opTime; if (MONGO_FAIL_POINT(hangAfterReservingPrepareTimestamp)) { // This log output is used in js tests so please leave it. log() << "transaction - hangAfterReservingPrepareTimestamp fail point " "enabled. Blocking until fail point is disabled. Prepare OpTime: " << prepareOplogSlot.opTime; MONGO_FAIL_POINT_PAUSE_WHILE_SET(hangAfterReservingPrepareTimestamp); } } opCtx->recoveryUnit()->setPrepareTimestamp(prepareOplogSlot.opTime.getTimestamp()); opCtx->getWriteUnitOfWork()->prepare(); // We need to unlock the session to run the opObserver onTransactionPrepare, which calls back // into the session. lk.unlock(); opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot); abortGuard.Dismiss(); invariant(!_oldestOplogEntryTS, str::stream() << "This transaction's oldest oplog entry Timestamp has already " << "been set to: " << _oldestOplogEntryTS->toString()); // Keep track of the Timestamp from the first oplog entry written by this transaction. _oldestOplogEntryTS = prepareOplogSlot.opTime.getTimestamp(); // Maintain the Timestamp of the oldest active oplog entry for this transaction. We currently // only write an oplog entry for an in progress transaction when it is in the prepare state // but this will change when we allow multiple oplog entries per transaction. { stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onPrepare(ServerTransactionsMetrics::get(opCtx), *_oldestOplogEntryTS); } return prepareOplogSlot.opTime.getTimestamp(); } void TransactionParticipant::addTransactionOperation(OperationContext* opCtx, const repl::ReplOperation& operation) { stdx::lock_guard lk(_mutex); // Always check _getSession()'s txnNumber and '_txnState', since they can be modified by session // kill and migration, which do not check out the session. _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); // Ensure that we only ever add operations to an in progress transaction. invariant(_txnState.isInProgress(lk), str::stream() << "Current state: " << _txnState); invariant(_autoCommit && !*_autoCommit && _activeTxnNumber != kUninitializedTxnNumber); invariant(opCtx->lockState()->inAWriteUnitOfWork()); _transactionOperations.push_back(operation); _transactionOperationBytes += repl::OplogEntry::getReplOperationSize(operation); // _transactionOperationBytes is based on the in-memory size of the operation. With overhead, // we expect the BSON size of the operation to be larger, so it's possible to make a transaction // just a bit too large and have it fail only in the commit. It's still useful to fail early // when possible (e.g. to avoid exhausting server memory). uassert(ErrorCodes::TransactionTooLarge, str::stream() << "Total size of all transaction operations must be less than " << BSONObjMaxInternalSize << ". Actual size is " << _transactionOperationBytes, _transactionOperationBytes <= BSONObjMaxInternalSize); } std::vector TransactionParticipant::endTransactionAndRetrieveOperations( OperationContext* opCtx) { stdx::lock_guard lk(_mutex); // Always check session's txnNumber and '_txnState', since they can be modified by session kill // and migration, which do not check out the session. _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); // Ensure that we only ever end a transaction when prepared or in progress. invariant(_txnState.isInSet(lk, TransactionState::kPrepared | TransactionState::kInProgress), str::stream() << "Current state: " << _txnState); invariant(_autoCommit); _transactionOperationBytes = 0; return std::move(_transactionOperations); } void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx) { stdx::unique_lock lk(_mutex); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); uassert(ErrorCodes::InvalidOptions, "commitTransaction must provide commitTimestamp to prepared transaction.", !_txnState.isPrepared(lk)); // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB. invariant(!_oldestOplogEntryTS, str::stream() << "The oldest oplog entry Timestamp should not have been set because " << "this transaction is not prepared. But, it is currently " << _oldestOplogEntryTS->toString()); // We need to unlock the session to run the opObserver onTransactionCommit, which calls back // into the session. lk.unlock(); auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); opObserver->onTransactionCommit(opCtx, boost::none, boost::none); lk.lock(); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); // The oplog entry is written in the same WUOW with the data change for unprepared transactions. // We can still consider the state is InProgress until now, since no externally visible changes // have been made yet by the commit operation. If anything throws before this point in the // function, entry point will abort the transaction. _txnState.transitionTo(lk, TransactionState::kCommittingWithoutPrepare); lk.unlock(); _commitStorageTransaction(opCtx); lk.lock(); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), false); invariant(_txnState.isCommittingWithoutPrepare(lk), str::stream() << "Current State: " << _txnState); _finishCommitTransaction(lk, opCtx); } void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx, Timestamp commitTimestamp) { stdx::unique_lock lk(_mutex); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); uassert(ErrorCodes::InvalidOptions, "commitTransaction cannot provide commitTimestamp to unprepared transaction.", _txnState.isPrepared(lk)); uassert( ErrorCodes::InvalidOptions, "'commitTimestamp' cannot be null", !commitTimestamp.isNull()); uassert(ErrorCodes::InvalidOptions, "'commitTimestamp' must be greater than or equal to 'prepareTimestamp'", commitTimestamp >= _prepareOpTime.getTimestamp()); _txnState.transitionTo(lk, TransactionState::kCommittingWithPrepare); opCtx->recoveryUnit()->setCommitTimestamp(commitTimestamp); try { // We reserve an oplog slot before committing the transaction so that no writes that are // causally related to the transaction commit enter the oplog at a timestamp earlier than // the commit oplog entry. OplogSlotReserver oplogSlotReserver(opCtx); const auto commitOplogSlot = oplogSlotReserver.getReservedOplogSlot(); invariant(commitOplogSlot.opTime.getTimestamp() >= commitTimestamp, str::stream() << "Commit oplog entry must be greater than or equal to commit " "timestamp due to causal consistency. commit timestamp: " << commitTimestamp.toBSON() << ", commit oplog entry optime: " << commitOplogSlot.opTime.toBSON()); // We need to unlock the session to run the opObserver onTransactionCommit, which calls back // into the session. We also do not want to write to storage with the mutex locked. lk.unlock(); _commitStorageTransaction(opCtx); auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); opObserver->onTransactionCommit(opCtx, commitOplogSlot, commitTimestamp); lk.lock(); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); _finishCommitTransaction(lk, opCtx); _getSession()->unlockTxnNumber(); } catch (...) { // It is illegal for committing a prepared transaction to fail for any reason, other than an // invalid command, so we crash instead. severe() << "Caught exception during commit of prepared transaction " << opCtx->getTxnNumber() << " on " << _getSession()->getSessionId().toBSON() << ": " << exceptionToStatus(); std::terminate(); } } void TransactionParticipant::_commitStorageTransaction(OperationContext* opCtx) try { invariant(opCtx->getWriteUnitOfWork()); opCtx->getWriteUnitOfWork()->commit(); opCtx->setWriteUnitOfWork(nullptr); // We must clear the recovery unit and locker for the 'config.transactions' and oplog entry // writes. opCtx->setRecoveryUnit(std::unique_ptr( opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()), WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); opCtx->lockState()->unsetMaxLockTimeout(); } catch (...) { // It is illegal for committing a storage-transaction to fail so we crash instead. severe() << "Caught exception during commit of storage-transaction " << opCtx->getTxnNumber() << " on " << _getSession()->getSessionId().toBSON() << ": " << exceptionToStatus(); std::terminate(); } void TransactionParticipant::_finishCommitTransaction(WithLock lk, OperationContext* opCtx) { // If no writes have been done, set the client optime forward to the read timestamp so waiting // for write concern will ensure all read data was committed. // // TODO(SERVER-34881): Once the default read concern is speculative majority, only set the // client optime forward if the original read concern level is "majority" or "snapshot". auto& clientInfo = repl::ReplClientInfo::forClient(opCtx->getClient()); if (_speculativeTransactionReadOpTime > clientInfo.getLastOp()) { clientInfo.setLastOp(_speculativeTransactionReadOpTime); } _txnState.transitionTo(lk, TransactionState::kCommitted); const auto curTime = curTimeMicros64(); { stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onCommit(ServerTransactionsMetrics::get(opCtx), curTime, _oldestOplogEntryTS, &Top::get(getGlobalServiceContext())); _transactionMetricsObserver.onTransactionOperation( opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics); } // We must clear the recovery unit and locker so any post-transaction writes can run without // transactional settings such as a read timestamp. _cleanUpTxnResourceOnOpCtx(lk, opCtx, TransactionState::kCommitted); } void TransactionParticipant::shutdown() { stdx::lock_guard lock(_mutex); _inShutdown = true; _txnResourceStash = boost::none; } void TransactionParticipant::abortArbitraryTransaction() { stdx::lock_guard lock(_mutex); if (!_txnState.isInProgress(lock)) { // We do not want to abort transactions that are prepared unless we get an // 'abortTransaction' command. return; } _abortTransactionOnSession(lock); } void TransactionParticipant::abortArbitraryTransactionIfExpired() { stdx::lock_guard lock(_mutex); if (!_txnState.isInProgress(lock) || !_transactionExpireDate || _transactionExpireDate >= Date_t::now()) { return; } const auto session = _getSession(); auto currentOperation = session->getCurrentOperation(); if (currentOperation) { // If an operation is still running for this transaction when it expires, kill the currently // running operation. stdx::lock_guard clientLock(*currentOperation->getClient()); getGlobalServiceContext()->killOperation(currentOperation, ErrorCodes::ExceededTimeLimit); } // Log after killing the current operation because jstests may wait to see this log message to // imply that the operation has been killed. log() << "Aborting transaction with txnNumber " << _activeTxnNumber << " on session with lsid " << session->getSessionId().getId() << " because it has been running for longer than 'transactionLifetimeLimitSeconds'"; _abortTransactionOnSession(lock); } void TransactionParticipant::abortActiveTransaction(OperationContext* opCtx) { stdx::unique_lock lock(_mutex); // This function shouldn't throw if the transaction is already aborted. _checkIsActiveTransaction(lock, *opCtx->getTxnNumber(), false); _abortActiveTransaction( std::move(lock), opCtx, TransactionState::kInProgress | TransactionState::kPrepared); } void TransactionParticipant::abortActiveUnpreparedOrStashPreparedTransaction( OperationContext* opCtx) try { stdx::unique_lock lock(_mutex); if (_txnState.isInSet(lock, TransactionState::kNone)) { // If there is no active transaction, do nothing. return; } // We do this check to follow convention and maintain safety. If this were to throw we should // have returned in the check above. As a result, throwing here is fatal. _checkIsActiveTransaction(lock, *opCtx->getTxnNumber(), false); // Stash the transaction if it's in prepared state. if (_txnState.isInSet(lock, TransactionState::kPrepared)) { _stashActiveTransaction(lock, opCtx); return; } // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB. invariant(!_oldestOplogEntryTS, str::stream() << "The oldest oplog entry Timestamp should not have been set because " << "this transaction is not prepared. But, it is currently " << _oldestOplogEntryTS->toString()); _abortActiveTransaction(std::move(lock), opCtx, TransactionState::kInProgress); } catch (...) { // It is illegal for this to throw so we catch and log this here for diagnosability. severe() << "Caught exception during transaction " << opCtx->getTxnNumber() << " abort or stash on " << _getSession()->getSessionId().toBSON() << " in state " << _txnState << ": " << exceptionToStatus(); std::terminate(); } void TransactionParticipant::_abortActiveTransaction(stdx::unique_lock lock, OperationContext* opCtx, TransactionState::StateSet expectedStates) { invariant(!_txnResourceStash); invariant(!_txnState.isCommittingWithPrepare(lock)); if (!_txnState.isNone(lock)) { stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onTransactionOperation( opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics); } // We write the abort oplog entry before aborting the transaction so that no writes that are // causally related to the transaction aborting enter the oplog with a timestamp earlier // than the abort oplog entry's timestamp. This is required so that secondaries apply subsequent // operations on a document with a prepared update after the prepared update is aborted. // We need to unlock the mutex to run the opObserver onTransactionAbort, which calls back // into the TransactionParticipant. lock.unlock(); auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); opObserver->onTransactionAbort(opCtx); lock.lock(); // We do not check if the active transaction number is correct here because we handle it below. // Only abort the transaction in session if it's in expected states. // When the state of active transaction on session is not expected, it means another // thread has already aborted the transaction on session. if (_txnState.isInSet(lock, expectedStates)) { invariant(opCtx->getTxnNumber() == _activeTxnNumber); _abortTransactionOnSession(lock); } else if (opCtx->getTxnNumber() == _activeTxnNumber) { if (_txnState.isNone(lock)) { // The active transaction is not a multi-document transaction. invariant(opCtx->getWriteUnitOfWork() == nullptr); return; } // Cannot abort these states unless they are specified in expectedStates explicitly. const auto unabortableStates = TransactionState::kPrepared // | TransactionState::kCommittingWithPrepare // | TransactionState::kCommittingWithoutPrepare // | TransactionState::kCommitted; // invariant(!_txnState.isInSet(lock, unabortableStates), str::stream() << "Cannot abort transaction in " << _txnState.toString()); } else { // If _activeTxnNumber is higher than ours, it means the transaction is already aborted. invariant(_txnState.isInSet(lock, TransactionState::kNone | TransactionState::kAborted)); } // Clean up the transaction resources on the opCtx even if the transaction resources on the // session were not aborted. This actually aborts the storage-transaction. _cleanUpTxnResourceOnOpCtx(lock, opCtx, TransactionState::kAborted); } void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { const auto curTime = curTimeMicros64(); // If the transaction is stashed, then we have aborted an inactive transaction. if (_txnResourceStash) { // The transaction is stashed, so we abort the inactive transaction on session. { stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onAbortInactive( ServerTransactionsMetrics::get(getGlobalServiceContext()), curTime, _oldestOplogEntryTS, &Top::get(getGlobalServiceContext())); } _logSlowTransaction(wl, &(_txnResourceStash->locker()->getLockerInfo(boost::none))->stats, TransactionState::kAborted, _txnResourceStash->getReadConcernArgs()); _txnResourceStash = boost::none; } else { stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.onAbortActive( ServerTransactionsMetrics::get(getGlobalServiceContext()), curTime, _oldestOplogEntryTS, &Top::get(getGlobalServiceContext())); } _transactionOperationBytes = 0; _transactionOperations.clear(); _txnState.transitionTo(wl, TransactionState::kAborted); _prepareOpTime = repl::OpTime(); _oldestOplogEntryTS = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); _getSession()->unlockTxnNumber(); } void TransactionParticipant::_cleanUpTxnResourceOnOpCtx( WithLock wl, OperationContext* opCtx, TransactionState::StateFlag terminationCause) { // Log the transaction if its duration is longer than the slowMS command threshold. _logSlowTransaction( wl, &(opCtx->lockState()->getLockerInfo(CurOp::get(*opCtx)->getLockStatsBase()))->stats, terminationCause, repl::ReadConcernArgs::get(opCtx)); // Reset the WUOW. We should be able to abort empty transactions that don't have WUOW. if (opCtx->getWriteUnitOfWork()) { opCtx->setWriteUnitOfWork(nullptr); } // We must clear the recovery unit and locker so any post-transaction writes can run without // transactional settings such as a read timestamp. opCtx->setRecoveryUnit(std::unique_ptr( opCtx->getServiceContext()->getStorageEngine()->newRecoveryUnit()), WriteUnitOfWork::RecoveryUnitState::kNotInUnitOfWork); opCtx->lockState()->unsetMaxLockTimeout(); } void TransactionParticipant::_checkIsActiveTransaction(WithLock wl, const TxnNumber& requestTxnNumber, bool checkAbort) const { const auto txnNumber = _getSession()->getActiveTxnNumber(); uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Cannot perform operations on active transaction " << _activeTxnNumber << " on session " << _getSession()->getSessionId() << " because a different transaction " << txnNumber << " is now active.", txnNumber == _activeTxnNumber); uassert(ErrorCodes::ConflictingOperationInProgress, str::stream() << "Cannot perform operations on requested transaction " << requestTxnNumber << " on session " << _getSession()->getSessionId() << " because a different transaction " << _activeTxnNumber << " is now active.", requestTxnNumber == _activeTxnNumber); uassert(ErrorCodes::NoSuchTransaction, str::stream() << "Transaction " << txnNumber << " has been aborted.", !checkAbort || !_txnState.isAborted(wl)); } void TransactionParticipant::_checkIsCommandValidWithTxnState(WithLock wl, OperationContext* opCtx, const std::string& cmdName) { // Throw NoSuchTransaction error instead of TransactionAborted error since this is the entry // point of transaction execution. uassert(ErrorCodes::NoSuchTransaction, str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.", !_txnState.isAborted(wl)); // Cannot change committed transaction but allow retrying commitTransaction command. uassert(ErrorCodes::TransactionCommitted, str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been committed.", cmdName == "commitTransaction" || !_txnState.isCommitted(wl)); // Disallow operations other than abort, prepare or commit on a prepared transaction uassert(ErrorCodes::PreparedTransactionInProgress, str::stream() << "Cannot call any operation other than abort, prepare or commit on" << " a prepared transaction", !_txnState.isPrepared(wl) || preparedTxnCmdWhitelist.find(cmdName) != preparedTxnCmdWhitelist.cend()); } Status TransactionParticipant::isValid(StringData dbName, StringData cmdName) { if (cmdName == "count"_sd) { return {ErrorCodes::OperationNotSupportedInTransaction, "Cannot run 'count' in a multi-document transaction. Please see " "http://dochub.mongodb.org/core/transaction-count for a recommended alternative."}; } if (txnCmdWhitelist.find(cmdName) == txnCmdWhitelist.cend() && !(getTestCommandsEnabled() && txnCmdForTestingWhitelist.find(cmdName) != txnCmdForTestingWhitelist.cend())) { return {ErrorCodes::OperationNotSupportedInTransaction, str::stream() << "Cannot run '" << cmdName << "' in a multi-document transaction."}; } if (dbName == "config"_sd || dbName == "local"_sd || (dbName == "admin"_sd && txnAdminCommands.find(cmdName) == txnAdminCommands.cend())) { return {ErrorCodes::OperationNotSupportedInTransaction, str::stream() << "Cannot run command against the '" << dbName << "' database in a transaction"}; } return Status::OK(); } BSONObj TransactionParticipant::reportStashedState() const { BSONObjBuilder builder; reportStashedState(&builder); return builder.obj(); } void TransactionParticipant::reportStashedState(BSONObjBuilder* builder) const { stdx::lock_guard lm(_mutex); if (_txnResourceStash && _txnResourceStash->locker()) { if (auto lockerInfo = _txnResourceStash->locker()->getLockerInfo(boost::none)) { invariant(_activeTxnNumber != kUninitializedTxnNumber); builder->append("host", getHostNameCachedAndPort()); builder->append("desc", "inactive transaction"); const auto& lastClientInfo = _transactionMetricsObserver.getSingleTransactionStats().getLastClientInfo(); builder->append("client", lastClientInfo.clientHostAndPort); builder->append("connectionId", lastClientInfo.connectionId); builder->append("appName", lastClientInfo.appName); builder->append("clientMetadata", lastClientInfo.clientMetadata); { BSONObjBuilder lsid(builder->subobjStart("lsid")); _getSession()->getSessionId().serialize(&lsid); } BSONObjBuilder transactionBuilder; _reportTransactionStats( lm, &transactionBuilder, _txnResourceStash->getReadConcernArgs()); builder->append("transaction", transactionBuilder.obj()); builder->append("waitingForLock", false); builder->append("active", false); fillLockerInfo(*lockerInfo, *builder); } } } void TransactionParticipant::reportUnstashedState(repl::ReadConcernArgs readConcernArgs, BSONObjBuilder* builder) const { stdx::lock_guard lm(_metricsMutex); // This method may only take the metrics mutex, as it is called with the Client mutex held. So // we cannot check the stashed state directly. Instead, a transaction is considered unstashed // if it is not actually a transaction (retryable write, no stash used), or is active (not // stashed), or has ended (any stash would be cleared). const auto& singleTransactionStats = _transactionMetricsObserver.getSingleTransactionStats(); if (!singleTransactionStats.isForMultiDocumentTransaction() || singleTransactionStats.isActive() || singleTransactionStats.isEnded()) { BSONObjBuilder transactionBuilder; _reportTransactionStats(lm, &transactionBuilder, readConcernArgs); builder->append("transaction", transactionBuilder.obj()); } } std::string TransactionParticipant::TransactionState::toString(StateFlag state) { switch (state) { case TransactionParticipant::TransactionState::kNone: return "TxnState::None"; case TransactionParticipant::TransactionState::kInProgress: return "TxnState::InProgress"; case TransactionParticipant::TransactionState::kPrepared: return "TxnState::Prepared"; case TransactionParticipant::TransactionState::kCommittingWithoutPrepare: return "TxnState::CommittingWithoutPrepare"; case TransactionParticipant::TransactionState::kCommittingWithPrepare: return "TxnState::CommittingWithPrepare"; case TransactionParticipant::TransactionState::kCommitted: return "TxnState::Committed"; case TransactionParticipant::TransactionState::kAborted: return "TxnState::Aborted"; } MONGO_UNREACHABLE; } bool TransactionParticipant::TransactionState::_isLegalTransition(StateFlag oldState, StateFlag newState) { switch (oldState) { case kNone: switch (newState) { case kNone: case kInProgress: return true; default: return false; } MONGO_UNREACHABLE; case kInProgress: switch (newState) { case kNone: case kPrepared: case kCommittingWithoutPrepare: case kAborted: return true; default: return false; } MONGO_UNREACHABLE; case kPrepared: switch (newState) { case kCommittingWithPrepare: case kAborted: return true; default: return false; } MONGO_UNREACHABLE; case kCommittingWithPrepare: case kCommittingWithoutPrepare: switch (newState) { case kNone: case kCommitted: case kAborted: return true; default: return false; } MONGO_UNREACHABLE; case kCommitted: switch (newState) { case kNone: case kInProgress: return true; default: return false; } MONGO_UNREACHABLE; case kAborted: switch (newState) { case kNone: case kInProgress: return true; default: return false; } MONGO_UNREACHABLE; } MONGO_UNREACHABLE; } void TransactionParticipant::TransactionState::transitionTo(WithLock, StateFlag newState, TransitionValidation shouldValidate) { if (shouldValidate == TransitionValidation::kValidateTransition) { invariant(TransactionState::_isLegalTransition(_state, newState), str::stream() << "Current state: " << toString(_state) << ", Illegal attempted next state: " << toString(newState)); } _state = newState; } void TransactionParticipant::_reportTransactionStats(WithLock wl, BSONObjBuilder* builder, repl::ReadConcernArgs readConcernArgs) const { _transactionMetricsObserver.getSingleTransactionStats().report(builder, readConcernArgs); } void TransactionParticipant::_updateState(WithLock wl, const Session::RefreshState& newState) { if (newState.refreshCount <= _lastStateRefreshCount) { return; } _activeTxnNumber = newState.txnNumber; if (newState.isCommitted) { _txnState.transitionTo(wl, TransactionState::kCommitted, TransactionState::TransitionValidation::kRelaxTransitionValidation); } _lastStateRefreshCount = newState.refreshCount; } std::string TransactionParticipant::_transactionInfoForLog( const SingleThreadedLockStats* lockStats, TransactionState::StateFlag terminationCause, repl::ReadConcernArgs readConcernArgs) { invariant(lockStats); invariant(terminationCause == TransactionState::kCommitted || terminationCause == TransactionState::kAborted); StringBuilder s; // User specified transaction parameters. BSONObjBuilder parametersBuilder; BSONObjBuilder lsidBuilder(parametersBuilder.subobjStart("lsid")); _getSession()->getSessionId().serialize(&lsidBuilder); lsidBuilder.doneFast(); parametersBuilder.append("txnNumber", _activeTxnNumber); parametersBuilder.append("autocommit", _autoCommit ? *_autoCommit : true); readConcernArgs.appendInfo(¶metersBuilder); s << "parameters:" << parametersBuilder.obj().toString() << ","; s << " readTimestamp:" << _speculativeTransactionReadOpTime.getTimestamp().toString() << ","; auto singleTransactionStats = _transactionMetricsObserver.getSingleTransactionStats(); s << singleTransactionStats.getOpDebug()->additiveMetrics.report(); std::string terminationCauseString = terminationCause == TransactionState::kCommitted ? "committed" : "aborted"; s << " terminationCause:" << terminationCauseString; auto curTime = curTimeMicros64(); s << " timeActiveMicros:" << durationCount(singleTransactionStats.getTimeActiveMicros(curTime)); s << " timeInactiveMicros:" << durationCount(singleTransactionStats.getTimeInactiveMicros(curTime)); // Number of yields is always 0 in multi-document transactions, but it is included mainly to // match the format with other slow operation logging messages. s << " numYields:" << 0; // Aggregate lock statistics. BSONObjBuilder locks; lockStats->report(&locks); s << " locks:" << locks.obj().toString(); // Total duration of the transaction. s << " " << Milliseconds{static_cast(singleTransactionStats.getDuration(curTime)) / 1000}; return s.str(); } void TransactionParticipant::_logSlowTransaction(WithLock wl, const SingleThreadedLockStats* lockStats, TransactionState::StateFlag terminationCause, repl::ReadConcernArgs readConcernArgs) { // Only log multi-document transactions. if (!_txnState.isNone(wl)) { // Log the transaction if its duration is longer than the slowMS command threshold. if (_transactionMetricsObserver.getSingleTransactionStats().getDuration(curTimeMicros64()) > serverGlobalParams.slowMS * 1000ULL) { log(logger::LogComponent::kTransaction) << "transaction " << _transactionInfoForLog(lockStats, terminationCause, readConcernArgs); } } } void TransactionParticipant::checkForNewTxnNumber() { auto txnNumber = _getSession()->getActiveTxnNumber(); stdx::lock_guard lg(_mutex); if (txnNumber > _activeTxnNumber) { _setNewTxnNumber(lg, txnNumber); } } void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnNumber) { invariant(!_txnState.isInSet( wl, TransactionState::kPrepared | TransactionState::kCommittingWithPrepare)); // Abort the existing transaction if it's not prepared, committed, or aborted. if (_txnState.isInProgress(wl)) { _abortTransactionOnSession(wl); } _activeTxnNumber = txnNumber; _txnState.transitionTo(wl, TransactionState::kNone); { stdx::lock_guard lm(_metricsMutex); _transactionMetricsObserver.resetSingleTransactionStats(txnNumber); } _prepareOpTime = repl::OpTime(); _oldestOplogEntryTS = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); _multikeyPathInfo.clear(); _autoCommit = boost::none; } } // namespace mongo