/** * Copyright (C) 2018-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, * as published by MongoDB, Inc. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * Server Side Public License for more details. * * You should have received a copy of the Server Side Public License * along with this program. If not, see * . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the Server Side Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #pragma once #include #include #include #include "mongo/base/disallow_copying.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/read_concern_args.h" #include "mongo/s/async_requests_sender.h" #include "mongo/s/client/shard.h" #include "mongo/s/shard_id.h" #include "mongo/util/string_map.h" namespace mongo { /** * Keeps track of the transaction state. A session is in use when it is being used by a request. */ class TransactionRouter { public: // The default value to use as the statement id of the first command in the transaction if none // was sent. static const StmtId kDefaultFirstStmtId = 0; /** * Represents the options for a transaction that are shared across all participants. These * cannot be changed without restarting the transactions that may have already been begun on * every participant, i.e. clearing the current participant list. */ struct SharedTransactionOptions { // Set for all distributed transactions. TxnNumber txnNumber; repl::ReadConcernArgs readConcernArgs; // Only set for transactions with snapshot level read concern. boost::optional atClusterTime; }; /** * Represents a shard participant in a distributed transaction. Lives only for the duration of * the transaction that created it. */ class Participant { public: explicit Participant(bool isCoordinator, StmtId stmtIdCreatedAt, SharedTransactionOptions sharedOptions); /** * Attaches necessary fields if this is participating in a multi statement transaction. */ BSONObj attachTxnFieldsIfNeeded(BSONObj cmd, bool isFirstStatementInThisParticipant) const; /** * True if the participant has been chosen as the coordinator for its transaction. */ bool isCoordinator() const; /** * Returns the highest statement id of the command during which this participant was * created. */ StmtId getStmtIdCreatedAt() const; /** * Returns the shared transaction options this participant was created with. */ const auto& getSharedOptions() const { return _sharedOptions; } private: const bool _isCoordinator{false}; // The highest statement id of the request during which this participant was created. const StmtId _stmtIdCreatedAt{kUninitializedStmtId}; const SharedTransactionOptions _sharedOptions; }; /** * Encapsulates the logic around selecting a global read timestamp for a sharded transaction at * snapshot level read concern. * * The first command in a transaction to target at least one shard must select a cluster time * timestamp before targeting, but may change the timestamp before contacting any shards to * allow optimizing the timestamp based on the targeted shards. If the first command encounters * a retryable error, e.g. StaleShardVersion or SnapshotTooOld, the retry may also select a new * timestamp. Once the first command has successfully completed, the timestamp cannot be * changed. */ class AtClusterTime { public: /** * Cannot be called until a timestamp has been set. */ LogicalTime getTime() const; /** * Sets the timestamp and remembers the statement id of the command that set it. */ void setTime(LogicalTime atClusterTime, StmtId currentStmtId); /** * True if the timestamp has been set to a non-null value. */ bool isSet() const; /** * True if the timestamp can be changed by a command running at the given statement id. */ bool canChange(StmtId currentStmtId) const; private: StmtId _stmtIdSelectedAt = kUninitializedStmtId; LogicalTime _atClusterTime; }; TransactionRouter(LogicalSessionId sessionId); /** * Starts a fresh transaction in this session or continue an existing one. Also cleans up the * previous transaction state. */ void beginOrContinueTxn(OperationContext* opCtx, TxnNumber txnNumber, bool startTransaction); void checkIn(); void checkOut(); /** * Attaches the required transaction related fields for a request to be sent to the given * shard. * * Calling this method has the following side effects: * 1. Potentially selecting a coordinator. * 2. Adding the shard to the list of participants. * 3. Also append fields for first statements (ex. startTransaction, readConcern) * if the shard was newly added to the list of participants. */ BSONObj attachTxnFieldsIfNeeded(const ShardId& shardId, const BSONObj& cmdObj); /** * Updates the transaction state to allow for a retry of the current command on a stale version * error. Will throw if the transaction cannot be continued. */ void onStaleShardOrDbError(StringData cmdName); /** * Resets the transaction state to allow for a retry attempt. This includes clearing all * participants, clearing the coordinator, and resetting the global read timestamp. Will throw * if the transaction cannot be continued. */ void onSnapshotError(); /** * Updates the transaction tracking state to allow for a retry attempt on a view resolution * error. */ void onViewResolutionError(); /** * Computes and sets the atClusterTime for the current transaction based on the given query * parameters. Does nothing if the transaction does not have snapshot read concern or an * atClusterTime has already been selected and cannot be changed. */ void computeAndSetAtClusterTime(OperationContext* opCtx, bool mustRunOnAll, const std::set& shardIds, const NamespaceString& nss, const BSONObj query, const BSONObj collation); /** * Computes and sets the atClusterTime for the current transaction based on the targeted shard. * Does nothing if the transaction does not have snapshot read concern or an atClusterTime has * already been selected and cannot be changed. */ void computeAndSetAtClusterTimeForUnsharded(OperationContext* opCtx, const ShardId& shardId); /** * Sets the atClusterTime for the current transaction to the latest time in the router's logical * clock. Does nothing if the transaction does not have snapshot read concern or an * atClusterTime has already been selected and cannot be changed. */ void setDefaultAtClusterTime(OperationContext* opCtx); /** * Returns the global read timestamp for this transaction. Returns boost::none for transactions * that don't run at snapshot level read concern or if a timestamp has not yet been selected. */ const boost::optional& getAtClusterTime() const; bool isCheckedOut(); const LogicalSessionId& getSessionId() const; boost::optional getCoordinatorId() const; /** * Commits the transaction. For transactions with multiple participants, this will initiate * the two phase commit procedure. */ Shard::CommandResponse commitTransaction(OperationContext* opCtx); /** * Sends abort to all participants and returns the responses from all shards. */ std::vector abortTransaction(OperationContext* opCtx); /** * Sends abort to all shards in the current participant list. Will retry on retryable errors, * but ignores the responses from each shard. */ void implicitlyAbortTransaction(OperationContext* opCtx); /** * Extract the runtimne state attached to the operation context. Returns nullptr if none is * attached. */ static TransactionRouter* get(OperationContext* opCtx); /** * Returns the participant for this transaction. */ boost::optional getParticipant(const ShardId& shard); private: /** * Run basic commit for transactions that touched a single shard. */ Shard::CommandResponse _commitSingleShardTransaction(OperationContext* opCtx); /** * Run two phase commit for transactions that touched multiple shards. */ Shard::CommandResponse _commitMultiShardTransaction(OperationContext* opCtx); /** * Sets the given logical time as the atClusterTime for the transaction to be the greater of the * given time and the user's afterClusterTime, if one was provided. */ void _setAtClusterTime(const boost::optional& afterClusterTime, LogicalTime candidateTime); /** * Returns true if the current transaction can retry on a stale version error from a contacted * shard. This is always true except for an error received by a write that is not the first * overall statement in the sharded transaction. This is because the entire command will be * retried, and shards that were not stale and are targeted again may incorrectly execute the * command a second time. * * Note: Even if this method returns true, the retry attempt may still fail, e.g. if one of the * shards that returned a stale version error was involved in a previously completed a statement * for this transaction. * * TODO SERVER-37207: Change batch writes to retry only the failed writes in a batch, to allow * retrying writes beyond the first overall statement. */ bool _canContinueOnStaleShardOrDbError(StringData cmdName) const; /** * Returns true if the current transaction can retry on a snapshot error. This is only true on * the first command recevied for a transaction. */ bool _canContinueOnSnapshotError() const; /** * Removes all participants created during the current statement from the participant list. */ void _clearPendingParticipants(); /** * Creates a new participant for the shard. */ Participant& _createParticipant(const ShardId& shard); /** * Asserts the transaction has a valid read concern and, if the read concern level is snapshot, * has selected a non-null atClusterTime. */ void _verifyReadConcern(); /** * If the transaction's read concern level is snapshot, asserts the participant's atClusterTime * matches the transaction's. */ void _verifyParticipantAtClusterTime(const Participant& participant); const LogicalSessionId _sessionId; TxnNumber _txnNumber{kUninitializedTxnNumber}; // True if this is currently being used by a request. bool _isCheckedOut{false}; // Map of current participants of the current transaction. StringMap _participants; // The id of coordinator participant, used to construct prepare requests. boost::optional _coordinatorId; // The read concern the current transaction was started with. repl::ReadConcernArgs _readConcernArgs; // The cluster time of the timestamp all participant shards in the current transaction with // snapshot level read concern must read from. Only set for transactions running with snapshot // level read concern. boost::optional _atClusterTime; // The statement id of the latest received command for this transaction. For batch writes, this // will be the highest stmtId contained in the batch. Incremented by one if new commands do not // contain statement ids. StmtId _latestStmtId = kUninitializedStmtId; // The statement id of the command that began this transaction. Defaults to zero if no statement // id was included in the first command. StmtId _firstStmtId = kUninitializedStmtId; }; /** * Scoped object, which checks out the session specified in the passed operation context and stores * it for later access by the command. The session is installed at construction time and is removed * at destruction. This can only be used for multi-statement transactions. */ class ScopedRouterSession { MONGO_DISALLOW_COPYING(ScopedRouterSession); public: ScopedRouterSession(OperationContext* opCtx); ~ScopedRouterSession(); private: OperationContext* const _opCtx; }; } // namespace mongo