/** * Copyright (C) 2017 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include "mongo/base/disallow_copying.h" #include "mongo/bson/timestamp.h" #include "mongo/db/concurrency/locker.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/concurrency/with_lock.h" namespace mongo { class OperationContext; class UpdateRequest; /** * A write through cache for the state of a particular session. All modifications to the underlying * session transactions collection must be performed through an object of this class. * * The cache state can be 'up-to-date' (it is in sync with the persistent contents) or 'needs * refresh' (in which case refreshFromStorageIfNeeded needs to be called in order to make it * up-to-date). */ class Session { MONGO_DISALLOW_COPYING(Session); public: /** * Holds state for a snapshot read or multi-statement transaction in between network operations. */ class TxnResources { public: /** * Stashes transaction state from 'opCtx' in the newly constructed TxnResources. */ TxnResources(OperationContext* opCtx); ~TxnResources(); // Rule of 5: because we have a class-defined destructor, we need to explictly specify // the move operator and move assignment operator. TxnResources(TxnResources&&) = default; TxnResources& operator=(TxnResources&&) = default; /** * Returns a const pointer to the stashed lock state, or nullptr if no stashed locks exist. */ const Locker* locker() const { return _locker.get(); } /** * Releases stashed transaction state onto 'opCtx'. Must only be called once. */ void release(OperationContext* opCtx); private: bool _released = false; std::unique_ptr _locker; std::unique_ptr _recoveryUnit; repl::ReadConcernArgs _readConcernArgs; }; using CommittedStatementTimestampMap = stdx::unordered_map; static const BSONObj kDeadEndSentinel; explicit Session(LogicalSessionId sessionId); const LogicalSessionId& getSessionId() const { return _sessionId; } /** * Blocking method, which loads the transaction state from storage if it has been marked as * needing refresh. * * In order to avoid the possibility of deadlock, this method must not be called while holding a * lock. */ void refreshFromStorageIfNeeded(OperationContext* opCtx); /** * Starts a new transaction on the session, or continues an already active transaction. In this * context, a "transaction" is a sequence of operations associated with a transaction number. * This sequence of operations could be a retryable write or multi-statement transaction. Both * utilize this method. * * The 'autocommit' argument represents the value of the field given in the original client * request. If it is boost::none, no autocommit parameter was passed into the request. Every * operation that is part of a multi statement transaction must specify 'autocommit=false'. * 'startTransaction' represents the value of the field given in the original client request, * and indicates whether this operation is the beginning of a multi-statement transaction. * * Throws an exception if: * - An attempt is made to start a transaction with number less than the latest * transaction this session has seen. * - The session has been invalidated. * - The values of 'autocommit' and/or 'startTransaction' are inconsistent with the current * state of the transaction. * * In order to avoid the possibility of deadlock, this method must not be called while holding a * lock. This method must also be called after refreshFromStorageIfNeeded has been called. */ void beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber, boost::optional autocommit, boost::optional startTransaction); /** * Similar to beginOrContinueTxn except it is used specifically for shard migrations and does * not check or modify the autocommit parameter. */ void beginOrContinueTxnOnMigration(OperationContext* opCtx, TxnNumber txnNumber); /** * Called after a write under the specified transaction completes while the node is a primary * and specifies the statement ids which were written. Must be called while the caller is still * in the write's WUOW. Updates the on-disk state of the session to match the specified * transaction/opTime and keeps the cached state in sync. * * Must only be called with the session checked-out. * * Throws if the session has been invalidated or the active transaction number doesn't match. */ void onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate); /** * Helper function to begin a migration on a primary node. * * Returns whether the specified statement should be migrated at all or skipped. */ bool onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId); /** * Called after an entry for the specified session and transaction has been written to the oplog * during chunk migration, while the node is still primary. Must be called while the caller is * still in the oplog write's WUOW. Updates the on-disk state of the session to match the * specified transaction/opTime and keeps the cached state in sync. * * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary * and doesn't require the session to be checked-out. * * Throws if the session has been invalidated or the active transaction number is newer than the * one specified. */ void onMigrateCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate); /** * Marks the session as requiring refresh. Used when the session state has been modified * externally, such as through a direct write to the transactions table. */ void invalidate(); /** * Returns the op time of the last committed write for this session and transaction. If no write * has completed yet, returns an empty timestamp. * * Throws if the session has been invalidated or the active transaction number doesn't match. */ repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const; /** * Checks whether the given statementId for the specified transaction has already executed and * if so, returns the oplog entry which was generated by that write. If the statementId hasn't * executed, returns boost::none. * * Must only be called with the session checked-out. * * Throws if the session has been invalidated or the active transaction number doesn't match. */ boost::optional checkStatementExecuted(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) const; /** * Checks whether the given statementId for the specified transaction has already executed * without fetching the oplog entry which was generated by that write. * * Must only be called with the session checked-out. * * Throws if the session has been invalidated or the active transaction number doesn't match. */ bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const; /** * Transfers management of transaction resources from the OperationContext to the Session. */ void stashTransactionResources(OperationContext* opCtx); /** * Transfers management of transaction resources from the Session to the OperationContext. */ void unstashTransactionResources(OperationContext* opCtx); /** * Commits the transaction, including committing the write unit of work and updating * transaction state. */ void commitTransaction(OperationContext* opCtx); /** * Aborts the transaction outside the transaction, releasing transaction resources. */ void abortArbitraryTransaction(); /* * Aborts the transaction inside the transaction, releasing transaction resources. * We're inside the transaction when we have the Session checked out and 'opCtx' owns the * transaction resources. */ void abortActiveTransaction(OperationContext* opCtx); bool getAutocommit() const { return _autocommit; } /** * Returns whether we are in a multi-document transaction, which means we have an active * transaction which has autoCommit:false and has not been committed or aborted. */ bool inMultiDocumentTransaction() const { stdx::lock_guard lk(_mutex); return _txnState == MultiDocumentTransactionState::kInProgress; }; /** * Returns whether we are in a read-only or multi-document transaction. */ bool inSnapshotReadOrMultiDocumentTransaction() const { stdx::lock_guard lk(_mutex); return _txnState == MultiDocumentTransactionState::kInProgress || _txnState == MultiDocumentTransactionState::kInSnapshotRead; } bool transactionIsCommitted() const { stdx::lock_guard lk(_mutex); return _txnState == MultiDocumentTransactionState::kCommitted; } bool transactionIsAborted() const { stdx::lock_guard lk(_mutex); return _txnState == MultiDocumentTransactionState::kAborted; } /** * Adds a stored operation to the list of stored operations for the current multi-document * (non-autocommit) transaction. It is illegal to add operations when no multi-document * transaction is in progress. */ void addTransactionOperation(OperationContext* opCtx, const repl::ReplOperation& operation); /** * Returns and clears the stored operations for an multi-document (non-autocommit) transaction, * and marks the transaction as closed. It is illegal to attempt to add operations to the * transaction after this is called. */ std::vector endTransactionAndRetrieveOperations(OperationContext* opCtx); const std::vector& transactionOperationsForTest() { return _transactionOperations; } TxnNumber getActiveTxnNumberForTest() const { stdx::lock_guard lk(_mutex); return _activeTxnNumber; } /** * If this session is holding stashed locks in _txnResourceStash, reports the current state of * the session using the provided builder. Locks the session object's mutex while running. */ void reportStashedState(BSONObjBuilder* builder) const; /** * Convenience method which creates and populates a BSONObj containing the stashed state. * Returns an empty BSONObj if this session has no stashed resources. */ BSONObj reportStashedState() const; /** * Scan through the list of operations and add new oplog entries for updating * config.transactions if needed. */ static std::vector addOpsForReplicatingTxnTable( const std::vector& ops); /** * Returns a new oplog entry if the given entry has transaction state embedded within in. * The new oplog entry will contain the operation needed to replicate the transaction * table. * Returns boost::none if the given oplog doesn't have any transaction state or does not * support update to the transaction table. */ static boost::optional createMatchingTransactionTableUpdate( const repl::OplogEntry& entry); private: void _beginOrContinueTxn(WithLock, TxnNumber txnNumber, boost::optional autocommit, boost::optional startTransaction); void _beginOrContinueTxnOnMigration(WithLock, TxnNumber txnNumber); // Checks if there is a conflicting operation on the current Session void _checkValid(WithLock) const; // Checks that a new txnNumber is higher than the activeTxnNumber so // we don't start a txn that is too old. void _checkTxnValid(WithLock, TxnNumber txnNumber) const; void _setActiveTxn(WithLock, TxnNumber txnNumber); void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber, bool checkAbort) const; boost::optional _checkStatementExecuted(WithLock, TxnNumber txnNumber, StmtId stmtId) const; UpdateRequest _makeUpdateRequest(WithLock, TxnNumber newTxnNumber, const repl::OpTime& newLastWriteTs, Date_t newLastWriteDate) const; void _registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteTs); // Releases stashed transaction resources to abort the transaction. void _abortTransaction(WithLock); // Committing a transaction first changes its state to "Committing" and writes to the oplog, // then it changes the state to "Committed". // // When a transaction is in "Committing" state, it's not allowed for other threads to change its // state (i.e. abort the transaction), otherwise the on-disk state will diverge from the // in-memory state. // There are 3 cases where the transaction will be aborted. // 1) abortTransaction command. Session check-out mechanism only allows one client to access a // transaction. // 2) killSession, stepdown, transaction timeout and any thread that aborts the transaction // outside of session checkout. They can safely skip the committing transactions. // 3) Migration. Should be able to skip committing transactions. void _commitTransaction(stdx::unique_lock lk, OperationContext* opCtx); const LogicalSessionId _sessionId; // Protects the member variables below. mutable stdx::mutex _mutex; // Condition variable notified when we finish an attempt to commit the global WUOW. stdx::condition_variable _commitcv; // Specifies whether the session information needs to be refreshed from storage bool _isValid{false}; // Counter, incremented with each call to invalidate in order to discern invalidations, which // happen during refresh int _numInvalidations{0}; // Set to true if incomplete history is detected. For example, when the oplog to a write was // truncated because it was too old. bool _hasIncompleteHistory{false}; // Caches what is known to be the last written transaction record for the session boost::optional _lastWrittenSessionRecord; // Tracks the last seen txn number for the session and is always >= to the transaction number in // the last written txn record. When it is > than that in the last written txn record, this // means a new transaction has begun on the session, but it hasn't yet performed any writes. TxnNumber _activeTxnNumber{kUninitializedTxnNumber}; // Holds transaction resources between network operations. boost::optional _txnResourceStash; // Indicates the state of the current multi-document transaction or snapshot read, if any. If // the transaction is in any state but kInProgress, no more operations can be collected. enum class MultiDocumentTransactionState { kNone, kInSnapshotRead, kInProgress, kCommitting, kCommitted, kAborted } _txnState = MultiDocumentTransactionState::kNone; // Holds oplog data for operations which have been applied in the current multi-document // transaction. Not used for retryable writes. std::vector _transactionOperations; // For the active txn, tracks which statement ids have been committed and at which oplog // opTime. Used for fast retryability check and retrieving the previous write's data without // having to scan through the oplog. CommittedStatementTimestampMap _activeTxnCommittedStatements; // Set in _beginOrContinueTxn and applies to the activeTxn on the session. bool _autocommit{true}; }; } // namespace mongo