/* * Copyright (C) 2017 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include "mongo/base/disallow_copying.h" #include "mongo/bson/timestamp.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/unordered_map.h" #include "mongo/util/concurrency/with_lock.h" namespace mongo { class OperationContext; class UpdateRequest; /** * A write through cache for the state of a particular session. All modifications to the underlying * session transactions collection must be performed through an object of this class. * * The cache state can be 'up-to-date' (it is in sync with the persistent contents) or 'needs * refresh' (in which case refreshFromStorageIfNeeded needs to be called in order to make it * up-to-date). */ class Session : public Decorable { MONGO_DISALLOW_COPYING(Session); public: using CommittedStatementTimestampMap = stdx::unordered_map; static const BSONObj kDeadEndSentinel; explicit Session(LogicalSessionId sessionId); const LogicalSessionId& getSessionId() const { return _sessionId; } struct RefreshState { long long refreshCount{0}; TxnNumber txnNumber{kUninitializedTxnNumber}; bool isCommitted{false}; }; /** * Blocking method, which loads the transaction state from storage if it has been marked as * needing refresh. * * In order to avoid the possibility of deadlock, this method must not be called while holding a * lock. */ void refreshFromStorageIfNeeded(OperationContext* opCtx); /** * Starts a new transaction on the session, or continues an already active transaction. In this * context, a "transaction" is a sequence of operations associated with a transaction number. * * Throws an exception if: * - An attempt is made to start a transaction with number less than the latest * transaction this session has seen. * - The session has been invalidated. * * In order to avoid the possibility of deadlock, this method must not be called while holding a * lock. This method must also be called after refreshFromStorageIfNeeded has been called. */ void beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber); /** * Called after a write under the specified transaction completes while the node is a primary * and specifies the statement ids which were written. Must be called while the caller is still * in the write's WUOW. Updates the on-disk state of the session to match the specified * transaction/opTime and keeps the cached state in sync. * * 'txnState' is 'none' for retryable writes. * * Must only be called with the session checked-out. * * Throws if the session has been invalidated or the active transaction number doesn't match. */ void onWriteOpCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t lastStmtIdWriteDate, boost::optional txnState); /** * Helper function to begin a migration on a primary node. * * Returns whether the specified statement should be migrated at all or skipped. * * Not called with session checked out. */ bool onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId); /** * Called after an entry for the specified session and transaction has been written to the oplog * during chunk migration, while the node is still primary. Must be called while the caller is * still in the oplog write's WUOW. Updates the on-disk state of the session to match the * specified transaction/opTime and keeps the cached state in sync. * * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary * and doesn't require the session to be checked-out. * * Throws if the session has been invalidated or the active transaction number is newer than the * one specified. */ void onMigrateCompletedOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteOpTime, Date_t oplogLastStmtIdWriteDate); /** * Marks the session as requiring refresh. Used when the session state has been modified * externally, such as through a direct write to the transactions table. */ void invalidate(); /** * Returns the op time of the last committed write for this session and transaction. If no write * has completed yet, returns an empty timestamp. * * Throws if the session has been invalidated or the active transaction number doesn't match. */ repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const; /** * Checks whether the given statementId for the specified transaction has already executed and * if so, returns the oplog entry which was generated by that write. If the statementId hasn't * executed, returns boost::none. * * Must only be called with the session checked-out. * * Throws if the session has been invalidated or the active transaction number doesn't match. */ boost::optional checkStatementExecuted(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId) const; /** * Checks whether the given statementId for the specified transaction has already executed * without fetching the oplog entry which was generated by that write. * * Must only be called with the session checked-out. * * Throws if the session has been invalidated or the active transaction number doesn't match. */ bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const; TxnNumber getActiveTxnNumber() const { stdx::lock_guard lk(_mutex); return _activeTxnNumber; } /** * Returns a new oplog entry if the given entry has transaction state embedded within in. * The new oplog entry will contain the operation needed to replicate the transaction * table. * Returns boost::none if the given oplog doesn't have any transaction state or does not * support update to the transaction table. */ static boost::optional createMatchingTransactionTableUpdate( const repl::OplogEntry& entry); /** * Returns the state of the session from storage the last time a refresh occurred. */ boost::optional getLastRefreshState() const; /** * Attempt to lock the active TxnNumber of this session to the given number. This operation * can only succeed if it is equal to the current active TxnNumber. Also sets the error status * for any callers trying to modify the TxnNumber. */ void lockTxnNumber(const TxnNumber lockThisNumber, Status conflictError); /** * Release the lock on the active TxnNumber and allow it to be modified. */ void unlockTxnNumber(); private: void _beginOrContinueTxn(WithLock, TxnNumber txnNumber); // Checks if there is a conflicting operation on the current Session void _checkValid(WithLock) const; // Checks that a new txnNumber is higher than the activeTxnNumber so // we don't start a txn that is too old. void _checkTxnValid(WithLock, TxnNumber txnNumber) const; void _setActiveTxn(WithLock, TxnNumber txnNumber); void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const; boost::optional _checkStatementExecuted(WithLock, TxnNumber txnNumber, StmtId stmtId) const; // Returns the write date of the last committed write for this session and transaction. If no // write has completed yet, returns an empty date. // // Throws if the session has been invalidated or the active transaction number doesn't match. Date_t _getLastWriteDate(WithLock, TxnNumber txnNumber) const; UpdateRequest _makeUpdateRequest(WithLock, TxnNumber newTxnNumber, const repl::OpTime& newLastWriteTs, Date_t newLastWriteDate, boost::optional newState) const; void _registerUpdateCacheOnCommit(OperationContext* opCtx, TxnNumber newTxnNumber, std::vector stmtIdsWritten, const repl::OpTime& lastStmtIdWriteTs); const LogicalSessionId _sessionId; // Protects the member variables below. mutable stdx::mutex _mutex; // Specifies whether the session information needs to be refreshed from storage bool _isValid{false}; // Counter, incremented with each call to invalidate in order to discern invalidations, which // happen during refresh int _numInvalidations{0}; // Set to true if incomplete history is detected. For example, when the oplog to a write was // truncated because it was too old. bool _hasIncompleteHistory{false}; // Caches what is known to be the last written transaction record for the session boost::optional _lastWrittenSessionRecord; // Tracks the last seen txn number for the session and is always >= to the transaction number in // the last written txn record. When it is > than that in the last written txn record, this // means a new transaction has begun on the session, but it hasn't yet performed any writes. TxnNumber _activeTxnNumber{kUninitializedTxnNumber}; // For the active txn, tracks which statement ids have been committed and at which oplog // opTime. Used for fast retryability check and retrieving the previous write's data without // having to scan through the oplog. CommittedStatementTimestampMap _activeTxnCommittedStatements; // Stores the state from last refresh. boost::optional _lastRefreshState; // True if txnNumber cannot be modified. bool _isTxnNumberLocked{false}; // The status to return when an operation tries to modify the active TxnNumber while it is // locked. boost::optional _txnNumberLockConflictStatus; }; } // namespace mongo