summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_participant.h
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-09-27 06:05:54 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-08 02:38:48 -0400
commitb272bf351c39677d1e87d5c7fcd8b15b61465012 (patch)
tree9a92c09de2c9eb4244ca4b97d320f9d1e70637af /src/mongo/db/transaction_participant.h
parent07066a49b935a538ed54716fdd9a98d40c31fba4 (diff)
downloadmongo-b272bf351c39677d1e87d5c7fcd8b15b61465012.tar.gz
SERVER-36799 Move all transactions and retryable writes functionality from Session into TransactionParticipant
This change leaves the Session class to be a plain decorable structure only used for serialization of operations on the same logical session.
Diffstat (limited to 'src/mongo/db/transaction_participant.h')
-rw-r--r--src/mongo/db/transaction_participant.h220
1 files changed, 195 insertions, 25 deletions
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 9bdefa70dec..29e9b3e8bfe 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -36,15 +36,18 @@
#include "mongo/db/concurrency/locker.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/multi_key_path_tracker.h"
+#include "mongo/db/ops/update_request.h"
#include "mongo/db/repl/oplog.h"
+#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/read_concern_args.h"
#include "mongo/db/session.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/single_transaction_stats.h"
#include "mongo/db/storage/recovery_unit.h"
#include "mongo/db/transaction_metrics_observer.h"
+#include "mongo/stdx/unordered_map.h"
#include "mongo/util/assert_util.h"
#include "mongo/util/concurrency/with_lock.h"
-#include "mongo/util/decorable.h"
#include "mongo/util/mongoutils/str.h"
namespace mongo {
@@ -139,6 +142,10 @@ public:
OperationContext* _opCtx;
};
+ using CommittedStatementTimestampMap = stdx::unordered_map<StmtId, repl::OpTime>;
+
+ static const BSONObj kDeadEndSentinel;
+
TransactionParticipant() = default;
/**
@@ -353,22 +360,44 @@ public:
}
/**
- * Starts a new transaction, or continues an already active transaction.
+ * Starts a new transaction (and if the txnNumber is newer aborts any in-progress transaction on
+ * the session), or continues an already active transaction.
+ *
+ * 'autocommit' comes from the 'autocommit' field in the original client request. The only valid
+ * values are boost::none (meaning no autocommit was specified) and false (meaning that this is
+ * the beginning of a multi-statement transaction).
+ *
+ * 'startTransaction' comes from the 'startTransaction' field in the original client request.
+ * See below for the acceptable values and the meaning of the combinations of autocommit and
+ * startTransaction.
*
- * The 'autocommit' argument represents the value of the field given in the original client
- * request. If it is boost::none, no autocommit parameter was passed into the request. Every
- * operation that is part of a multi statement transaction must specify 'autocommit=false'.
- * 'startTransaction' represents the value of the field given in the original client request,
- * and indicates whether this operation is the beginning of a multi-statement transaction.
+ * autocommit = boost::none, startTransaction = boost::none: Means retryable write
+ * autocommit = false, startTransaction = boost::none: Means continuation of a multi-statement
+ * transaction
+ * autocommit = false, startTransaction = true: Means abort whatever transaction is in progress
+ * on the session and start a new transaction
*
- * Throws an exception if:
- * - The values of 'autocommit' and/or 'startTransaction' are inconsistent with the current
- * state of the transaction.
+ * Any combination other than the ones listed above will invariant since it is expected that the
+ * caller has performed the necessary customer input validations.
+ *
+ * Exceptions of note, which can be thrown are:
+ * - TransactionTooOld - if attempt is made to start a transaction older than the currently
+ * active one or the last one which committed
+ * - PreparedTransactionInProgress - if the transaction is in the prepared state and a new
+ * transaction or retryable write is attempted
*/
void beginOrContinue(TxnNumber txnNumber,
boost::optional<bool> autocommit,
boost::optional<bool> startTransaction);
+ /**
+ * Used only by the secondary oplog application logic. Equivalent to 'beginOrContinue(txnNumber,
+ * false, true)' without performing any checks for whether the new txnNumber will start a
+ * transaction number in the past.
+ *
+ * NOTE: This method assumes that there are no concurrent users of the transaction since it
+ * unconditionally changes the active transaction on the session.
+ */
void beginOrContinueTransactionUnconditionally(TxnNumber txnNumber);
void transitionToPreparedforTest() {
@@ -382,10 +411,101 @@ public:
}
/**
- * Checks to see if the txnNumber changed in the parent session and perform the necessary
- * cleanup.
+ * Blocking method, which loads the transaction state from storage if it has been marked as
+ * needing refresh.
+ *
+ * In order to avoid the possibility of deadlock, this method must not be called while holding a
+ * lock.
+ */
+ void refreshFromStorageIfNeeded(OperationContext* opCtx);
+
+ TxnNumber getActiveTxnNumber() const {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ return _activeTxnNumber;
+ }
+
+ /**
+ * Called after a write under the specified transaction completes while the node is a primary
+ * and specifies the statement ids which were written. Must be called while the caller is still
+ * in the write's WUOW. Updates the on-disk state of the session to match the specified
+ * transaction/opTime and keeps the cached state in sync.
+ *
+ * 'txnState' is 'none' for retryable writes.
+ *
+ * Must only be called with the session checked-out.
+ *
+ * Throws if the session has been invalidated or the active transaction number doesn't match.
+ */
+ void onWriteOpCompletedOnPrimary(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t lastStmtIdWriteDate,
+ boost::optional<DurableTxnStateEnum> txnState);
+
+ /**
+ * Helper function to begin a migration on a primary node.
+ *
+ * Returns whether the specified statement should be migrated at all or skipped.
+ *
+ * Not called with session checked out.
+ */
+ bool onMigrateBeginOnPrimary(OperationContext* opCtx, TxnNumber txnNumber, StmtId stmtId);
+
+ /**
+ * Called after an entry for the specified session and transaction has been written to the oplog
+ * during chunk migration, while the node is still primary. Must be called while the caller is
+ * still in the oplog write's WUOW. Updates the on-disk state of the session to match the
+ * specified transaction/opTime and keeps the cached state in sync.
+ *
+ * May be called concurrently with onWriteOpCompletedOnPrimary or onMigrateCompletedOnPrimary
+ * and doesn't require the session to be checked-out.
+ *
+ * Throws if the session has been invalidated or the active transaction number is newer than the
+ * one specified.
+ */
+ void onMigrateCompletedOnPrimary(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteOpTime,
+ Date_t oplogLastStmtIdWriteDate);
+
+ /**
+ * Marks the session as requiring refresh. Used when the session state has been modified
+ * externally, such as through a direct write to the transactions table.
+ */
+ void invalidate();
+
+ /**
+ * Returns the op time of the last committed write for this session and transaction. If no write
+ * has completed yet, returns an empty timestamp.
+ *
+ * Throws if the session has been invalidated or the active transaction number doesn't match.
+ */
+ repl::OpTime getLastWriteOpTime(TxnNumber txnNumber) const;
+
+ /**
+ * Checks whether the given statementId for the specified transaction has already executed and
+ * if so, returns the oplog entry which was generated by that write. If the statementId hasn't
+ * executed, returns boost::none.
+ *
+ * Must only be called with the session checked-out.
+ *
+ * Throws if the session has been invalidated or the active transaction number doesn't match.
+ */
+ boost::optional<repl::OplogEntry> checkStatementExecuted(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ StmtId stmtId) const;
+
+ /**
+ * Checks whether the given statementId for the specified transaction has already executed
+ * without fetching the oplog entry which was generated by that write.
+ *
+ * Must only be called with the session checked-out.
+ *
+ * Throws if the session has been invalidated or the active transaction number doesn't match.
*/
- void checkForNewTxnNumber();
+ bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const;
private:
/**
@@ -503,6 +623,43 @@ private:
return (s << txnState.toString());
}
+ // Shortcut to obtain the id of the session under which this participant runs
+ const LogicalSessionId& _sessionId() const;
+
+ /**
+ * Performing any checks based on the in-memory state of the TransactionParticipant requires
+ * that the object is fully in sync with its on-disk representation in the transactions table.
+ * This method checks that. The object can be out of sync with the on-disk representation either
+ * when it was just created, or after invalidate() was called (which typically happens after a
+ * direct write to the transactions table).
+ */
+ void _checkValid(WithLock) const;
+
+ // Checks that the specified transaction number is the same as the activeTxnNumber. Effectively
+ // a check that the caller operates on the transaction it thinks it is operating on.
+ void _checkIsActiveTransaction(WithLock, TxnNumber txnNumber) const;
+
+ boost::optional<repl::OpTime> _checkStatementExecuted(WithLock,
+ TxnNumber txnNumber,
+ StmtId stmtId) const;
+
+ // Returns the write date of the last committed write for this session and transaction. If no
+ // write has completed yet, returns an empty date.
+ //
+ // Throws if the session has been invalidated or the active transaction number doesn't match.
+ Date_t _getLastWriteDate(WithLock, TxnNumber txnNumber) const;
+
+ UpdateRequest _makeUpdateRequest(WithLock,
+ TxnNumber newTxnNumber,
+ const repl::OpTime& newLastWriteOpTime,
+ Date_t newLastWriteDate,
+ boost::optional<DurableTxnStateEnum> newState) const;
+
+ void _registerUpdateCacheOnCommit(OperationContext* opCtx,
+ TxnNumber newTxnNumber,
+ std::vector<StmtId> stmtIdsWritten,
+ const repl::OpTime& lastStmtIdWriteTs);
+
// Finishes committing the multi-document transaction after the storage-transaction has been
// committed, the oplog entry has been inserted into the oplog, and the transactions table has
// been updated.
@@ -562,8 +719,6 @@ private:
BSONObjBuilder* builder,
repl::ReadConcernArgs readConcernArgs) const;
- void _updateState(WithLock wl, const Session::RefreshState& newState);
-
// Bumps up the transaction number of this transaction and perform the necessary cleanup.
void _setNewTxnNumber(WithLock wl, const TxnNumber& txnNumber);
@@ -577,10 +732,6 @@ private:
// number.
void _continueMultiDocumentTransaction(WithLock wl, TxnNumber txnNumber);
- // Returns the session that this transaction belongs to.
- const Session* _getSession() const;
- Session* _getSession();
-
// Protects the member variables below.
mutable stdx::mutex _mutex;
@@ -599,10 +750,9 @@ private:
// Total size in bytes of all operations within the _transactionOperations vector.
size_t _transactionOperationBytes = 0;
- // This is the txnNumber that this transaction is actively working on. It can be different from
- // the current txnNumber of the parent session (since it can be changed in couple of ways, like
- // migration). In which case, it should make the necessary steps to also bump this number, like
- // aborting the current transaction.
+ // Tracks the last seen txn number for the session and is always >= to the transaction number in
+ // the last written txn record. When it is > than that in the last written txn record, this
+ // means a new transaction has begun on the session, but it hasn't yet performed any writes.
TxnNumber _activeTxnNumber{kUninitializedTxnNumber};
// Set when a snapshot read / transaction begins. Alleviates cache pressure by limiting how long
@@ -625,8 +775,28 @@ private:
std::vector<MultikeyPathInfo> _multikeyPathInfo;
- // Remembers the refresh count this object has read from Session.
- long long _lastStateRefreshCount{0};
+ //
+ // Retryable writes state
+ //
+
+ // Specifies whether the session information needs to be refreshed from storage
+ bool _isValid{false};
+
+ // Counter, incremented with each call to invalidate in order to discern invalidations, which
+ // happen during refresh
+ int _numInvalidations{0};
+
+ // Set to true if incomplete history is detected. For example, when the oplog to a write was
+ // truncated because it was too old.
+ bool _hasIncompleteHistory{false};
+
+ // Caches what is known to be the last written transaction record for the session
+ boost::optional<SessionTxnRecord> _lastWrittenSessionRecord;
+
+ // For the active txn, tracks which statement ids have been committed and at which oplog
+ // opTime. Used for fast retryability check and retrieving the previous write's data without
+ // having to scan through the oplog.
+ CommittedStatementTimestampMap _activeTxnCommittedStatements;
// Protects _transactionMetricsObserver. The concurrency rules are that const methods on
// _transactionMetricsObserver may be called under either _mutex or _metricsMutex, but for