diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-09-27 06:05:54 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-10-08 02:38:48 -0400 |
commit | b272bf351c39677d1e87d5c7fcd8b15b61465012 (patch) | |
tree | 9a92c09de2c9eb4244ca4b97d320f9d1e70637af /src/mongo/db/transaction_participant.h | |
parent | 07066a49b935a538ed54716fdd9a98d40c31fba4 (diff) | |
download | mongo-b272bf351c39677d1e87d5c7fcd8b15b61465012.tar.gz |
SERVER-36799 Move all transactions and retryable writes functionality from Session into TransactionParticipant
This change leaves the Session class to be a plain decorable structure
only used for serialization of operations on the same logical session.
Diffstat (limited to 'src/mongo/db/transaction_participant.h')
-rw-r--r-- | src/mongo/db/transaction_participant.h | 220 |
1 files changed, 195 insertions, 25 deletions
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 9bdefa70dec..29e9b3e8bfe 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -36,15 +36,18 @@ #include "mongo/db/concurrency/locker.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/multi_key_path_tracker.h" +#include "mongo/db/ops/update_request.h" #include "mongo/db/repl/oplog.h" +#include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/db/session.h" +#include "mongo/db/session_txn_record_gen.h" #include "mongo/db/single_transaction_stats.h" #include "mongo/db/storage/recovery_unit.h" #include "mongo/db/transaction_metrics_observer.h" +#include "mongo/stdx/unordered_map.h" #include "mongo/util/assert_util.h" #include "mongo/util/concurrency/with_lock.h" -#include "mongo/util/decorable.h" #include "mongo/util/mongoutils/str.h" namespace mongo { @@ -139,6 +142,10 @@ public: OperationContext* _opCtx; }; + using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>; + + static const BSONObj kDeadEndSentinel; + TransactionParticipant() = default; /** @@ -353,22 +360,44 @@ public: } /** - * Starts a new transaction, or continues an already active transaction. + * Starts a new transaction (and if the txnNumber is newer aborts any in-progress transaction on + * the session), or continues an already active transaction. + * + * 'autocommit' comes from the 'autocommit' field in the original client request. The only valid + * values are boost::none (meaning no autocommit was specified) and false (meaning that this is + * the beginning of a multi-statement transaction). + * + * 'startTransaction' comes from the 'startTransaction' field in the original client request. + * See below for the acceptable values and the meaning of the combinations of autocommit and + * startTransaction. * - * The 'autocommit' argument represents the value of the field given in the original client - * request. If it is boost::none, no autocommit parameter was passed into the request. Every - * operation that is part of a multi statement transaction must specify 'autocommit=false'. - * 'startTransaction' represents the value of the field given in the original client request, - * and indicates whether this operation is the beginning of a multi-statement transaction. + * autocommit = boost::none, startTransaction = boost::none: Means retryable write + * autocommit = false, startTransaction = boost::none: Means continuation of a multi-statement + * transaction + * autocommit = false, startTransaction = true: Means abort whatever transaction is in progress + * on the session and start a new transaction * - * Throws an exception if: - * - The values of 'autocommit' and/or 'startTransaction' are inconsistent with the current - * state of the transaction. + * Any combination other than the ones listed above will invariant since it is expected that the + * caller has performed the necessary customer input validations. + * + * Exceptions of note, which can be thrown are: + * - TransactionTooOld - if attempt is made to start a transaction older than the currently + * active one or the last one which committed + * - PreparedTransactionInProgress - if the transaction is in the prepared state and a new + * transaction or retryable write is attempted */ void beginOrContinue(TxnNumber txnNumber, boost::optional<bool> autocommit, boost::optional<bool> startTransaction); + /** + * Used only by the secondary oplog application logic. Equivalent to 'beginOrContinue(txnNumber, + * false, true)' without performing any checks for whether the new txnNumber will start a + * transaction number in the past. + * + * NOTE: This method assumes that there are no concurrent users of the transaction since it + * unconditionally changes the active transaction on the session. + */ void beginOrContinueTransactionUnconditionally(TxnNumber txnNumber); void transitionToPreparedforTest() { @@ -382,10 +411,101 @@ public: } /** - * Checks to see if the txnNumber changed in the parent session and perform the necessary - * cleanup. + * Blocking method, which loads the transaction state from storage if it has been marked as + * needing refresh. + * + * In order to avoid the possibility of deadlock, this method must not be called while holding a + * lock. + */ + void refreshFromStorageIfNeeded(OperationContext* opCtx); + + TxnNumber getActiveTxnNumber() const { + stdx::lock_guard<stdx::mutex> lg(_mutex); + return _activeTxnNumber; + } + + /** + * Called after a write under the specified transaction completes while the node is a primary + * and specifies the statement ids which were written. Must be called while the caller is still + * in the write's WUOW. Updates the on-disk state of the session to match the specified + * transaction/opTime and keeps the cached state in sync. + * + * 'txnState' is 'none' for retryable writes. + * + * Must only be called with the session checked-out. + * + * Throws if the session has been invalidated or the active transaction number doesn't match. + */ + void onWriteOpCompletedOnPrimary(OperationContext* opCtx, + TxnNumber txnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t lastStmtIdWriteDate, + boost::optional<DurableTxnStateEnum> txnState); + + /** + * Helper function to begin a migration on a primary node. + * + * Returns whether the specified statement should be migrated at all or skipped. + * + * Not called with session checked out. + */ + bool onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId); + + /** + * Called after an entry for the specified session and transaction has been written to the oplog + * during chunk migration, while the node is still primary. Must be called while the caller is + * still in the oplog write's WUOW. Updates the on-disk state of the session to match the + * specified transaction/opTime and keeps the cached state in sync. + * + * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary + * and doesn't require the session to be checked-out. + * + * Throws if the session has been invalidated or the active transaction number is newer than the + * one specified. + */ + void onMigrateCompletedOnPrimary(OperationContext* opCtx, + TxnNumber txnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteOpTime, + Date_t oplogLastStmtIdWriteDate); + + /** + * Marks the session as requiring refresh. Used when the session state has been modified + * externally, such as through a direct write to the transactions table. + */ + void invalidate(); + + /** + * Returns the op time of the last committed write for this session and transaction. If no write + * has completed yet, returns an empty timestamp. + * + * Throws if the session has been invalidated or the active transaction number doesn't match. + */ + repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const; + + /** + * Checks whether the given statementId for the specified transaction has already executed and + * if so, returns the oplog entry which was generated by that write. If the statementId hasn't + * executed, returns boost::none. + * + * Must only be called with the session checked-out. + * + * Throws if the session has been invalidated or the active transaction number doesn't match. + */ + boost::optional<repl::OplogEntry> checkStatementExecuted(OperationContext* opCtx, + TxnNumber txnNumber, + StmtId stmtId) const; + + /** + * Checks whether the given statementId for the specified transaction has already executed + * without fetching the oplog entry which was generated by that write. + * + * Must only be called with the session checked-out. + * + * Throws if the session has been invalidated or the active transaction number doesn't match. */ - void checkForNewTxnNumber(); + bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const; private: /** @@ -503,6 +623,43 @@ private: return (s << txnState.toString()); } + // Shortcut to obtain the id of the session under which this participant runs + const LogicalSessionId& _sessionId() const; + + /** + * Performing any checks based on the in-memory state of the TransactionParticipant requires + * that the object is fully in sync with its on-disk representation in the transactions table. + * This method checks that. The object can be out of sync with the on-disk representation either + * when it was just created, or after invalidate() was called (which typically happens after a + * direct write to the transactions table). + */ + void _checkValid(WithLock) const; + + // Checks that the specified transaction number is the same as the activeTxnNumber. Effectively + // a check that the caller operates on the transaction it thinks it is operating on. + void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const; + + boost::optional<repl::OpTime> _checkStatementExecuted(WithLock, + TxnNumber txnNumber, + StmtId stmtId) const; + + // Returns the write date of the last committed write for this session and transaction. If no + // write has completed yet, returns an empty date. + // + // Throws if the session has been invalidated or the active transaction number doesn't match. + Date_t _getLastWriteDate(WithLock, TxnNumber txnNumber) const; + + UpdateRequest _makeUpdateRequest(WithLock, + TxnNumber newTxnNumber, + const repl::OpTime& newLastWriteOpTime, + Date_t newLastWriteDate, + boost::optional<DurableTxnStateEnum> newState) const; + + void _registerUpdateCacheOnCommit(OperationContext* opCtx, + TxnNumber newTxnNumber, + std::vector<StmtId> stmtIdsWritten, + const repl::OpTime& lastStmtIdWriteTs); + // Finishes committing the multi-document transaction after the storage-transaction has been // committed, the oplog entry has been inserted into the oplog, and the transactions table has // been updated. @@ -562,8 +719,6 @@ private: BSONObjBuilder* builder, repl::ReadConcernArgs readConcernArgs) const; - void _updateState(WithLock wl, const Session::RefreshState& newState); - // Bumps up the transaction number of this transaction and perform the necessary cleanup. void _setNewTxnNumber(WithLock wl, const TxnNumber& txnNumber); @@ -577,10 +732,6 @@ private: // number. void _continueMultiDocumentTransaction(WithLock wl, TxnNumber txnNumber); - // Returns the session that this transaction belongs to. - const Session* _getSession() const; - Session* _getSession(); - // Protects the member variables below. mutable stdx::mutex _mutex; @@ -599,10 +750,9 @@ private: // Total size in bytes of all operations within the _transactionOperations vector. size_t _transactionOperationBytes = 0; - // This is the txnNumber that this transaction is actively working on. It can be different from - // the current txnNumber of the parent session (since it can be changed in couple of ways, like - // migration). In which case, it should make the necessary steps to also bump this number, like - // aborting the current transaction. + // Tracks the last seen txn number for the session and is always >= to the transaction number in + // the last written txn record. When it is > than that in the last written txn record, this + // means a new transaction has begun on the session, but it hasn't yet performed any writes. TxnNumber _activeTxnNumber{kUninitializedTxnNumber}; // Set when a snapshot read / transaction begins. Alleviates cache pressure by limiting how long @@ -625,8 +775,28 @@ private: std::vector<MultikeyPathInfo> _multikeyPathInfo; - // Remembers the refresh count this object has read from Session. - long long _lastStateRefreshCount{0}; + // + // Retryable writes state + // + + // Specifies whether the session information needs to be refreshed from storage + bool _isValid{false}; + + // Counter, incremented with each call to invalidate in order to discern invalidations, which + // happen during refresh + int _numInvalidations{0}; + + // Set to true if incomplete history is detected. For example, when the oplog to a write was + // truncated because it was too old. + bool _hasIncompleteHistory{false}; + + // Caches what is known to be the last written transaction record for the session + boost::optional<SessionTxnRecord> _lastWrittenSessionRecord; + + // For the active txn, tracks which statement ids have been committed and at which oplog + // opTime. Used for fast retryability check and retrieving the previous write's data without + // having to scan through the oplog. + CommittedStatementTimestampMap _activeTxnCommittedStatements; // Protects _transactionMetricsObserver. The concurrency rules are that const methods on // _transactionMetricsObserver may be called under either _mutex or _metricsMutex, but for |