diff options
author | Blake Oler <blake.oler@mongodb.com> | 2019-06-20 11:50:28 -0400 |
---|---|---|
committer | Blake Oler <blake.oler@mongodb.com> | 2019-07-02 15:22:48 -0400 |
commit | c512f1c4c82fec7dce815319be320e1b190788b6 (patch) | |
tree | 60942f8228aa1b3f2148ab0ec5efc89546d6b1f0 /src/mongo/s | |
parent | 324f6029326bc01d3adcd17a2467c49887147660 (diff) | |
download | mongo-c512f1c4c82fec7dce815319be320e1b190788b6.tar.gz |
SERVER-41676 Convert TransactionRouter to use observer pattern to synchronize internal data
with external observers
(cherry picked from commit c11b97788fcc91288deac647ddcc11625607d256)
Diffstat (limited to 'src/mongo/s')
20 files changed, 1008 insertions, 804 deletions
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp index 3d6cf6ed18b..372da718964 100644 --- a/src/mongo/s/cluster_commands_helpers.cpp +++ b/src/mongo/s/cluster_commands_helpers.cpp @@ -563,13 +563,12 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd( // Return the latest routing table if not running in a transaction with snapshot level read // concern. auto txnRouter = TransactionRouter::get(opCtx); - if (!txnRouter || !txnRouter->getAtClusterTime()) { + if (!txnRouter || !txnRouter.mustUseAtClusterTime()) { return catalogCache->getCollectionRoutingInfo(opCtx, nss); } - auto atClusterTime = txnRouter->getAtClusterTime(); - return catalogCache->getCollectionRoutingInfoAt( - opCtx, nss, atClusterTime->getTime().asTimestamp()); + auto atClusterTime = txnRouter.getSelectedAtClusterTime(); + return catalogCache->getCollectionRoutingInfoAt(opCtx, nss, atClusterTime.asTimestamp()); } } // namespace mongo diff --git a/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp b/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp index 7defb877a0b..141b1db98f0 100644 --- a/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp +++ b/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp @@ -76,7 +76,7 @@ public: "abortTransaction can only be run within a session", txnRouter); - auto abortRes = txnRouter->abortTransaction(opCtx); + auto abortRes = txnRouter.abortTransaction(opCtx); CommandHelpers::filterCommandReplyForPassthrough(abortRes, &result); return true; } diff --git a/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp b/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp index 38d6061b61b..6e4430d4127 100644 --- a/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp +++ b/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp @@ -75,11 +75,11 @@ public: auto txnRouter = TransactionRouter::get(opCtx); uassert(ErrorCodes::InvalidOptions, "commitTransaction can only be run within a session", - txnRouter != nullptr); + txnRouter); const auto commitCmd = CommitTransaction::parse(IDLParserErrorContext("commit cmd"), cmdObj); - auto commitRes = txnRouter->commitTransaction(opCtx, commitCmd.getRecoveryToken()); + auto commitRes = txnRouter.commitTransaction(opCtx, commitCmd.getRecoveryToken()); CommandHelpers::filterCommandReplyForPassthrough(commitRes, &result); return true; } diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp index eb0d514c76c..ff8049b2187 100644 --- a/src/mongo/s/commands/cluster_distinct_cmd.cpp +++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp @@ -204,7 +204,7 @@ public: auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson(); if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter->onViewResolutionError(opCtx, nss); + txnRouter.onViewResolutionError(opCtx, nss); } BSONObj aggResult = CommandHelpers::runCommandDirectly( diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 940c9299aaf..fb147d54a06 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -311,12 +311,10 @@ private: // transaction. We call _runCommand recursively, and this second time through // since it will be run as a transaction it will take the other code path to // updateShardKeyValueOnWouldChangeOwningShardError. - auto txnRouterForShardKeyChange = - documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); + documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); _runCommand(opCtx, shardId, shardVersion, nss, cmdObj, result); auto commitResponse = - documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction( - opCtx, txnRouterForShardKeyChange); + documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx); uassertStatusOK(getStatusFromCommandResult(commitResponse)); if (auto wcErrorElem = commitResponse["writeConcernError"]) { @@ -331,7 +329,7 @@ private: auto txnRouterForAbort = TransactionRouter::get(opCtx); if (txnRouterForAbort) - txnRouterForAbort->implicitlyAbortTransaction(opCtx, e.toStatus()); + txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus()); throw; } diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index 23e8e92b48c..917589675de 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -215,8 +215,7 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern); - auto txnRouterForShardKeyChange = - documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); + documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx); // Clear the error details from the response object before sending the write again response->unsetErrDetails(); ClusterWriter::write(opCtx, request, &stats, response); @@ -240,8 +239,8 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, } // Commit the transaction - auto commitResponse = documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction( - opCtx, txnRouterForShardKeyChange); + auto commitResponse = + documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx); uassertStatusOK(getStatusFromCommandResult(commitResponse)); @@ -268,7 +267,7 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx, auto txnRouterForAbort = TransactionRouter::get(opCtx); if (txnRouterForAbort) - txnRouterForAbort->implicitlyAbortTransaction(opCtx, status); + txnRouterForAbort.implicitlyAbortTransaction(opCtx, status); return false; } @@ -501,7 +500,7 @@ private: if (auto txnRouter = TransactionRouter::get(opCtx)) { auto writeCmdStatus = response.toStatus(); if (!writeCmdStatus.isOK()) { - txnRouter->implicitlyAbortTransaction(opCtx, writeCmdStatus); + txnRouter.implicitlyAbortTransaction(opCtx, writeCmdStatus); } } diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp index d10b9294110..119193d9ee5 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.cpp +++ b/src/mongo/s/commands/document_shard_key_update_util.cpp @@ -142,20 +142,21 @@ bool updateShardKeyForDocument(OperationContext* opCtx, opCtx, deleteCmdObj, insertCmdObj, nss.db(), documentKeyChangeInfo.getShouldUpsert()); } -TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) { +void startTransactionForShardKeyUpdate(OperationContext* opCtx) { auto txnRouter = TransactionRouter::get(opCtx); invariant(txnRouter); auto txnNumber = opCtx->getTxnNumber(); invariant(txnNumber); - txnRouter->beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart); - - return txnRouter; + txnRouter.beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart); } -BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter) { - return txnRouter->commitTransaction(opCtx, boost::none); +BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx) { + auto txnRouter = TransactionRouter::get(opCtx); + invariant(txnRouter); + + return txnRouter.commitTransaction(opCtx, boost::none); } BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss, diff --git a/src/mongo/s/commands/document_shard_key_update_util.h b/src/mongo/s/commands/document_shard_key_update_util.h index 8e3c4ac8ad2..a36579572cb 100644 --- a/src/mongo/s/commands/document_shard_key_update_util.h +++ b/src/mongo/s/commands/document_shard_key_update_util.h @@ -76,16 +76,16 @@ bool updateShardKeyForDocument(OperationContext* opCtx, int stmtId); /** - * Gets the transaction router and starts a transaction on this session. This method is called when - * WouldChangeOwningShard is thrown for a write that is not in a transaction already. + * Starts a transaction on this session. This method is called when WouldChangeOwningShard is thrown + * for a write that is not in a transaction already. */ -TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx); +void startTransactionForShardKeyUpdate(OperationContext* opCtx); /** * Commits the transaction on this session. This method is called to commit the transaction started * when WouldChangeOwningShard is thrown for a write that is not in a transaction already. */ -BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter); +BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx); /** * Creates the BSONObj that will be used to delete the pre-image document. Will also attach diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 1587c95c491..cbe2d514ec8 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -158,10 +158,12 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res */ void invokeInTransactionRouter(OperationContext* opCtx, CommandInvocation* invocation, - TransactionRouter* txnRouter, rpc::ReplyBuilderInterface* result) { + auto txnRouter = TransactionRouter::get(opCtx); + invariant(txnRouter); + // No-op if the transaction is not running with snapshot read concern. - txnRouter->setDefaultAtClusterTime(opCtx); + txnRouter.setDefaultAtClusterTime(opCtx); try { invocation->run(opCtx, result); @@ -173,7 +175,7 @@ void invokeInTransactionRouter(OperationContext* opCtx, throw; } - txnRouter->implicitlyAbortTransaction(opCtx, e.toStatus()); + txnRouter.implicitlyAbortTransaction(opCtx, e.toStatus()); throw; } } @@ -181,12 +183,12 @@ void invokeInTransactionRouter(OperationContext* opCtx, /** * Adds info from the active transaction and the given reason as context to the active exception. */ -void addContextForTransactionAbortingError(TransactionRouter* txnRouter, +void addContextForTransactionAbortingError(StringData txnIdAsString, + StmtId latestStmtId, DBException& ex, StringData reason) { - ex.addContext(str::stream() << "Transaction " << txnRouter->txnIdToString() - << " was aborted on statement " - << txnRouter->getLatestStmtId() + ex.addContext(str::stream() << "Transaction " << txnIdAsString << " was aborted on statement " + << latestStmtId << " due to: " << reason); } @@ -280,7 +282,7 @@ void execCommandClient(OperationContext* opCtx, auto txnRouter = TransactionRouter::get(opCtx); if (!supportsWriteConcern) { if (txnRouter) { - invokeInTransactionRouter(opCtx, invocation, txnRouter, result); + invokeInTransactionRouter(opCtx, invocation, result); } else { invocation->run(opCtx, result); } @@ -291,7 +293,7 @@ void execCommandClient(OperationContext* opCtx, opCtx->setWriteConcern(wcResult); if (txnRouter) { - invokeInTransactionRouter(opCtx, invocation, txnRouter, result); + invokeInTransactionRouter(opCtx, invocation, result); } else { invocation->run(opCtx, result); } @@ -314,8 +316,8 @@ void execCommandClient(OperationContext* opCtx, c->incrementCommandsFailed(); if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter->implicitlyAbortTransaction(opCtx, - getStatusFromCommandResult(body.asTempObj())); + txnRouter.implicitlyAbortTransaction(opCtx, + getStatusFromCommandResult(body.asTempObj())); } } } @@ -430,7 +432,7 @@ void runCommand(OperationContext* opCtx, return TransactionRouter::TransactionActions::kContinue; })(); - txnRouter->beginOrContinueTxn(opCtx, *txnNumber, transactionAction); + txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction); } for (int tries = 0;; ++tries) { @@ -451,7 +453,7 @@ void runCommand(OperationContext* opCtx, auto responseBuilder = replyBuilder->getBodyBuilder(); if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter->appendRecoveryToken(&responseBuilder); + txnRouter.appendRecoveryToken(&responseBuilder); } return; @@ -493,21 +495,27 @@ void runCommand(OperationContext* opCtx, // error cannot be retried on. if (auto txnRouter = TransactionRouter::get(opCtx)) { auto abortGuard = makeGuard( - [&] { txnRouter->implicitlyAbortTransaction(opCtx, ex.toStatus()); }); + [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); if (!canRetry) { - addContextForTransactionAbortingError(txnRouter, ex, "exhausted retries"); + addContextForTransactionAbortingError(txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + ex, + "exhausted retries"); throw; } - if (!txnRouter->canContinueOnStaleShardOrDbError(commandName)) { + if (!txnRouter.canContinueOnStaleShardOrDbError(commandName)) { addContextForTransactionAbortingError( - txnRouter, ex, "an error from cluster data placement change"); + txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + ex, + "an error from cluster data placement change"); throw; } // The error is retryable, so update transaction state before retrying. - txnRouter->onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); + txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); abortGuard.dismiss(); continue; @@ -526,21 +534,27 @@ void runCommand(OperationContext* opCtx, // error cannot be retried on. if (auto txnRouter = TransactionRouter::get(opCtx)) { auto abortGuard = makeGuard( - [&] { txnRouter->implicitlyAbortTransaction(opCtx, ex.toStatus()); }); + [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); if (!canRetry) { - addContextForTransactionAbortingError(txnRouter, ex, "exhausted retries"); + addContextForTransactionAbortingError(txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + ex, + "exhausted retries"); throw; } - if (!txnRouter->canContinueOnStaleShardOrDbError(commandName)) { + if (!txnRouter.canContinueOnStaleShardOrDbError(commandName)) { addContextForTransactionAbortingError( - txnRouter, ex, "an error from cluster data placement change"); + txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + ex, + "an error from cluster data placement change"); throw; } // The error is retryable, so update transaction state before retrying. - txnRouter->onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); + txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus()); abortGuard.dismiss(); continue; @@ -557,21 +571,26 @@ void runCommand(OperationContext* opCtx, // error cannot be retried on. if (auto txnRouter = TransactionRouter::get(opCtx)) { auto abortGuard = makeGuard( - [&] { txnRouter->implicitlyAbortTransaction(opCtx, ex.toStatus()); }); + [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); }); if (!canRetry) { - addContextForTransactionAbortingError(txnRouter, ex, "exhausted retries"); + addContextForTransactionAbortingError(txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + ex, + "exhausted retries"); throw; } - if (!txnRouter->canContinueOnSnapshotError()) { - addContextForTransactionAbortingError( - txnRouter, ex, "a non-retryable snapshot error"); + if (!txnRouter.canContinueOnSnapshotError()) { + addContextForTransactionAbortingError(txnRouter.txnIdToString(), + txnRouter.getLatestStmtId(), + ex, + "a non-retryable snapshot error"); throw; } // The error is retryable, so update transaction state before retrying. - txnRouter->onSnapshotError(opCtx, ex.toStatus()); + txnRouter.onSnapshotError(opCtx, ex.toStatus()); abortGuard.dismiss(); continue; diff --git a/src/mongo/s/multi_statement_transaction_requests_sender.cpp b/src/mongo/s/multi_statement_transaction_requests_sender.cpp index e376c4dd551..9671914d315 100644 --- a/src/mongo/s/multi_statement_transaction_requests_sender.cpp +++ b/src/mongo/s/multi_statement_transaction_requests_sender.cpp @@ -50,7 +50,7 @@ std::vector<AsyncRequestsSender::Request> attachTxnDetails( for (auto request : requests) { newRequests.emplace_back( request.shardId, - txnRouter->attachTxnFieldsIfNeeded(opCtx, request.shardId, request.cmdObj)); + txnRouter.attachTxnFieldsIfNeeded(opCtx, request.shardId, request.cmdObj)); } return newRequests; @@ -66,7 +66,8 @@ void processReplyMetadata(OperationContext* opCtx, const AsyncRequestsSender::Re return; } - txnRouter->processParticipantResponse(response.shardId, response.swResponse.getValue().data); + txnRouter.processParticipantResponse( + opCtx, response.shardId, response.swResponse.getValue().data); } } // unnamed namespace diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp index 85da678c34f..6b3e1902537 100644 --- a/src/mongo/s/query/cluster_aggregate.cpp +++ b/src/mongo/s/query/cluster_aggregate.cpp @@ -954,7 +954,7 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx, result->resetToEmpty(); if (auto txnRouter = TransactionRouter::get(opCtx)) { - txnRouter->onViewResolutionError(opCtx, requestedNss); + txnRouter.onViewResolutionError(opCtx, requestedNss); } // We pass both the underlying collection namespace and the view namespace here. The diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp index a41d5a789d7..07c10167123 100644 --- a/src/mongo/s/query/cluster_find.cpp +++ b/src/mongo/s/query/cluster_find.cpp @@ -452,15 +452,15 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx, catalogCache->onStaleShardVersion(std::move(routingInfo)); if (auto txnRouter = TransactionRouter::get(opCtx)) { - if (!txnRouter->canContinueOnStaleShardOrDbError(kFindCmdName)) { + if (!txnRouter.canContinueOnStaleShardOrDbError(kFindCmdName)) { throw; } // Reset the default global read timestamp so the retry's routing table reflects the // chunk placement after the refresh (no-op if the transaction is not running with // snapshot read concern). - txnRouter->onStaleShardOrDbError(opCtx, kFindCmdName, ex.toStatus()); - txnRouter->setDefaultAtClusterTime(opCtx); + txnRouter.onStaleShardOrDbError(opCtx, kFindCmdName, ex.toStatus()); + txnRouter.setDefaultAtClusterTime(opCtx); } } } diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index cf93fffed32..06ff22610f9 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -235,14 +235,52 @@ BSONObj sendCommitDirectlyToShards(OperationContext* opCtx, const std::vector<Sh return lastResult; } +// Helper to convert the CommitType enum into a human readable string for diagnostics. +std::string commitTypeToString(TransactionRouter::CommitType state) { + switch (state) { + case TransactionRouter::CommitType::kNotInitiated: + return "notInitiated"; + case TransactionRouter::CommitType::kNoShards: + return "noShards"; + case TransactionRouter::CommitType::kSingleShard: + return "singleShard"; + case TransactionRouter::CommitType::kSingleWriteShard: + return "singleWriteShard"; + case TransactionRouter::CommitType::kReadOnly: + return "readOnly"; + case TransactionRouter::CommitType::kTwoPhaseCommit: + return "twoPhaseCommit"; + case TransactionRouter::CommitType::kRecoverWithToken: + return "recoverWithToken"; + } + MONGO_UNREACHABLE; +} + } // unnamed namespace +TransactionRouter::TransactionRouter() = default; + +TransactionRouter::~TransactionRouter() = default; + +TransactionRouter::Observer::Observer(const ObservableSession& osession) + : Observer(&getTransactionRouter(osession.get())) {} + +TransactionRouter::Router::Router(OperationContext* opCtx) + : Observer([opCtx]() -> TransactionRouter* { + if (auto session = OperationContextSession::get(opCtx)) { + return &getTransactionRouter(session); + } + return nullptr; + }()) {} + TransactionRouter::Participant::Participant(bool inIsCoordinator, StmtId inStmtIdCreatedAt, + ReadOnly inReadOnly, SharedTransactionOptions inSharedOptions) : isCoordinator(inIsCoordinator), - stmtIdCreatedAt(inStmtIdCreatedAt), - sharedOptions(std::move(inSharedOptions)) {} + readOnly(inReadOnly), + sharedOptions(std::move(inSharedOptions)), + stmtIdCreatedAt(inStmtIdCreatedAt) {} BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded( BSONObj cmd, bool isFirstStatementInThisParticipant) const { @@ -302,12 +340,13 @@ BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded( return newCmd.obj(); } -void TransactionRouter::processParticipantResponse(const ShardId& shardId, - const BSONObj& responseObj) { +void TransactionRouter::Router::processParticipantResponse(OperationContext* opCtx, + const ShardId& shardId, + const BSONObj& responseObj) { auto participant = getParticipant(shardId); invariant(participant, "Participant should exist if processing participant response"); - if (_terminationInitiated) { + if (p().terminationInitiated) { // Do not process the transaction metadata after commit or abort have been initiated, // since a participant's state is partially reset on commit and abort. return; @@ -318,7 +357,7 @@ void TransactionRouter::processParticipantResponse(const ShardId& shardId, return; } - if (participant->stmtIdCreatedAt != _latestStmtId) { + if (participant->stmtIdCreatedAt != p().latestStmtId) { uassert( 51112, str::stream() << "readOnly field for participant " << shardId @@ -332,7 +371,7 @@ void TransactionRouter::processParticipantResponse(const ShardId& shardId, if (txnResponseMetadata.getReadOnly()) { if (participant->readOnly == Participant::ReadOnly::kUnset) { LOG(3) << txnIdToString() << " Marking " << shardId << " as read-only"; - participant->readOnly = Participant::ReadOnly::kReadOnly; + _setReadOnlyForParticipant(opCtx, shardId, Participant::ReadOnly::kReadOnly); return; } @@ -348,11 +387,12 @@ void TransactionRouter::processParticipantResponse(const ShardId& shardId, if (participant->readOnly != Participant::ReadOnly::kNotReadOnly) { LOG(3) << txnIdToString() << " Marking " << shardId << " as having done a write"; - participant->readOnly = Participant::ReadOnly::kNotReadOnly; - if (!_recoveryShardId) { + _setReadOnlyForParticipant(opCtx, shardId, Participant::ReadOnly::kNotReadOnly); + + if (!p().recoveryShardId) { LOG(3) << txnIdToString() << " Choosing " << shardId << " as recovery shard"; - _recoveryShardId = shardId; + p().recoveryShardId = shardId; } } } @@ -373,39 +413,26 @@ bool TransactionRouter::AtClusterTime::canChange(StmtId currentStmtId) const { return !_stmtIdSelectedAt || *_stmtIdSelectedAt == currentStmtId; } -TransactionRouter* TransactionRouter::get(OperationContext* opCtx) { - const auto session = OperationContextSession::get(opCtx); - if (session) { - return &getTransactionRouter(session); - } - - return nullptr; +bool TransactionRouter::Router::mustUseAtClusterTime() const { + return o().atClusterTime.is_initialized(); } -TransactionRouter* get(const ObservableSession& osession) { - return &getTransactionRouter(osession.get()); +LogicalTime TransactionRouter::Router::getSelectedAtClusterTime() const { + invariant(o().atClusterTime); + return o().atClusterTime->getTime(); } -TransactionRouter::TransactionRouter() = default; - -TransactionRouter::~TransactionRouter() = default; - -const boost::optional<TransactionRouter::AtClusterTime>& TransactionRouter::getAtClusterTime() - const { - return _atClusterTime; +const boost::optional<ShardId>& TransactionRouter::Router::getCoordinatorId() const { + return o().coordinatorId; } -const boost::optional<ShardId>& TransactionRouter::getCoordinatorId() const { - return _coordinatorId; +const boost::optional<ShardId>& TransactionRouter::Router::getRecoveryShardId() const { + return p().recoveryShardId; } -const boost::optional<ShardId>& TransactionRouter::getRecoveryShardId() const { - return _recoveryShardId; -} - -BSONObj TransactionRouter::attachTxnFieldsIfNeeded(OperationContext* opCtx, - const ShardId& shardId, - const BSONObj& cmdObj) { +BSONObj TransactionRouter::Router::attachTxnFieldsIfNeeded(OperationContext* opCtx, + const ShardId& shardId, + const BSONObj& cmdObj) { RouterTransactionsMetrics::get(opCtx)->incrementTotalRequestsTargeted(); if (auto txnPart = getParticipant(shardId)) { LOG(4) << txnIdToString() @@ -413,66 +440,93 @@ BSONObj TransactionRouter::attachTxnFieldsIfNeeded(OperationContext* opCtx, return txnPart->attachTxnFieldsIfNeeded(cmdObj, false); } - auto txnPart = _createParticipant(shardId); + auto txnPart = _createParticipant(opCtx, shardId); LOG(4) << txnIdToString() << " Sending transaction fields to new participant: " << shardId; - if (!_isRecoveringCommit) { + if (!p().isRecoveringCommit) { // Don't update participant stats during recovery since the participant list isn't known. RouterTransactionsMetrics::get(opCtx)->incrementTotalContactedParticipants(); } + return txnPart.attachTxnFieldsIfNeeded(cmdObj, true); } -void TransactionRouter::_verifyParticipantAtClusterTime(const Participant& participant) { +void TransactionRouter::Router::_verifyParticipantAtClusterTime(const Participant& participant) { const auto& participantAtClusterTime = participant.sharedOptions.atClusterTime; invariant(participantAtClusterTime); - invariant(*participantAtClusterTime == _atClusterTime->getTime()); + invariant(*participantAtClusterTime == o().atClusterTime->getTime()); } -TransactionRouter::Participant* TransactionRouter::getParticipant(const ShardId& shard) { - const auto iter = _participants.find(shard.toString()); - if (iter == _participants.end()) +const TransactionRouter::Participant* TransactionRouter::Router::getParticipant( + const ShardId& shard) { + const auto iter = o().participants.find(shard.toString()); + if (iter == o().participants.end()) return nullptr; - if (_atClusterTime) { + if (o().atClusterTime) { _verifyParticipantAtClusterTime(iter->second); } return &iter->second; } -TransactionRouter::Participant& TransactionRouter::_createParticipant(const ShardId& shard) { +TransactionRouter::Participant& TransactionRouter::Router::_createParticipant( + OperationContext* opCtx, const ShardId& shard) { + // The first participant is chosen as the coordinator. - auto isFirstParticipant = _participants.empty(); + auto isFirstParticipant = o().participants.empty(); if (isFirstParticipant) { - invariant(!_coordinatorId); - _coordinatorId = shard.toString(); + invariant(!o().coordinatorId); + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).coordinatorId = shard.toString(); } SharedTransactionOptions sharedOptions = { - _txnNumber, - _readConcernArgs, - _atClusterTime ? boost::optional<LogicalTime>(_atClusterTime->getTime()) : boost::none}; + o().txnNumber, + o().readConcernArgs, + o().atClusterTime ? boost::optional<LogicalTime>(o().atClusterTime->getTime()) + : boost::none}; + stdx::lock_guard<Client> lk(*opCtx->getClient()); auto resultPair = - _participants.try_emplace(shard.toString(), - TransactionRouter::Participant( - isFirstParticipant, _latestStmtId, std::move(sharedOptions))); + o(lk).participants.try_emplace(shard.toString(), + TransactionRouter::Participant(isFirstParticipant, + p().latestStmtId, + Participant::ReadOnly::kUnset, + std::move(sharedOptions))); return resultPair.first->second; } -void TransactionRouter::_assertAbortStatusIsOkOrNoSuchTransaction( +void TransactionRouter::Router::_setReadOnlyForParticipant(OperationContext* opCtx, + const ShardId& shard, + const Participant::ReadOnly readOnly) { + const auto iter = o().participants.find(shard.toString()); + invariant(iter != o().participants.end()); + const auto currentParticipant = iter->second; + + auto newParticipant = + TransactionRouter::Participant(currentParticipant.isCoordinator, + currentParticipant.stmtIdCreatedAt, + readOnly, + std::move(currentParticipant.sharedOptions)); + + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).participants.erase(iter); + o(lk).participants.try_emplace(shard.toString(), std::move(newParticipant)); +} + +void TransactionRouter::Router::_assertAbortStatusIsOkOrNoSuchTransaction( const AsyncRequestsSender::Response& response) const { auto shardResponse = uassertStatusOKWithContext( std::move(response.swResponse), str::stream() << "Failed to send abort to shard " << response.shardId << " between retries of statement " - << _latestStmtId); + << p().latestStmtId); auto status = getStatusFromCommandResult(shardResponse.data); uassert(ErrorCodes::NoSuchTransaction, str::stream() << txnIdToString() << "Transaction aborted between retries of statement " - << _latestStmtId + << p().latestStmtId << " due to error: " << status << " from shard: " @@ -483,17 +537,17 @@ void TransactionRouter::_assertAbortStatusIsOkOrNoSuchTransaction( // concern error. } -std::vector<ShardId> TransactionRouter::_getPendingParticipants() const { +std::vector<ShardId> TransactionRouter::Router::_getPendingParticipants() const { std::vector<ShardId> pendingParticipants; - for (const auto& participant : _participants) { - if (participant.second.stmtIdCreatedAt == _latestStmtId) { + for (const auto& participant : o().participants) { + if (participant.second.stmtIdCreatedAt == p().latestStmtId) { pendingParticipants.emplace_back(ShardId(participant.first)); } } return pendingParticipants; } -void TransactionRouter::_clearPendingParticipants(OperationContext* opCtx) { +void TransactionRouter::Router::_clearPendingParticipants(OperationContext* opCtx) { const auto pendingParticipants = _getPendingParticipants(); // Send abort to each pending participant. This resets their transaction state and guarantees no @@ -520,30 +574,33 @@ void TransactionRouter::_clearPendingParticipants(OperationContext* opCtx) { // If the participant being removed was chosen as the recovery shard, reset the recovery // shard. This is safe because this participant is a pending participant, meaning it // cannot have been returned in the recoveryToken on an earlier statement. - if (_recoveryShardId && *_recoveryShardId == participant) { - _recoveryShardId.reset(); + if (p().recoveryShardId && *p().recoveryShardId == participant) { + p().recoveryShardId.reset(); } - invariant(_participants.erase(participant)); + + stdx::lock_guard<Client> lk(*opCtx->getClient()); + invariant(o(lk).participants.erase(participant)); } // If there are no more participants, also clear the coordinator id because a new one must be // chosen by the retry. - if (_participants.empty()) { - _coordinatorId.reset(); + if (o().participants.empty()) { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).coordinatorId.reset(); return; } // If participants were created by an earlier command, the coordinator must be one of them. - invariant(_coordinatorId); - invariant(_participants.count(*_coordinatorId) == 1); + invariant(o().coordinatorId); + invariant(o().participants.count(*o().coordinatorId) == 1); } -bool TransactionRouter::canContinueOnStaleShardOrDbError(StringData cmdName) const { +bool TransactionRouter::Router::canContinueOnStaleShardOrDbError(StringData cmdName) const { if (MONGO_FAIL_POINT(enableStaleVersionAndSnapshotRetriesWithinTransactions)) { // We can always retry on the first overall statement because all targeted participants must // be pending, so the retry will restart the local transaction on each one, overwriting any // effects from the first attempt. - if (_latestStmtId == _firstStmtId) { + if (p().latestStmtId == p().firstStmtId) { return true; } @@ -563,9 +620,9 @@ bool TransactionRouter::canContinueOnStaleShardOrDbError(StringData cmdName) con return false; } -void TransactionRouter::onStaleShardOrDbError(OperationContext* opCtx, - StringData cmdName, - const Status& errorStatus) { +void TransactionRouter::Router::onStaleShardOrDbError(OperationContext* opCtx, + StringData cmdName, + const Status& errorStatus) { invariant(canContinueOnStaleShardOrDbError(cmdName)); LOG(3) << txnIdToString() @@ -576,7 +633,8 @@ void TransactionRouter::onStaleShardOrDbError(OperationContext* opCtx, _clearPendingParticipants(opCtx); } -void TransactionRouter::onViewResolutionError(OperationContext* opCtx, const NamespaceString& nss) { +void TransactionRouter::Router::onViewResolutionError(OperationContext* opCtx, + const NamespaceString& nss) { // The router can always retry on a view resolution error. LOG(3) << txnIdToString() @@ -588,72 +646,80 @@ void TransactionRouter::onViewResolutionError(OperationContext* opCtx, const Nam _clearPendingParticipants(opCtx); } -bool TransactionRouter::canContinueOnSnapshotError() const { +bool TransactionRouter::Router::canContinueOnSnapshotError() const { if (MONGO_FAIL_POINT(enableStaleVersionAndSnapshotRetriesWithinTransactions)) { - return _atClusterTime && _atClusterTime->canChange(_latestStmtId); + return o().atClusterTime && o().atClusterTime->canChange(p().latestStmtId); } return false; } -void TransactionRouter::onSnapshotError(OperationContext* opCtx, const Status& errorStatus) { +void TransactionRouter::Router::onSnapshotError(OperationContext* opCtx, + const Status& errorStatus) { invariant(canContinueOnSnapshotError()); LOG(3) << txnIdToString() << " Clearing pending participants and resetting global snapshot " "timestamp after snapshot error: " - << errorStatus << ", previous timestamp: " << _atClusterTime->getTime(); + << errorStatus << ", previous timestamp: " << o().atClusterTime->getTime(); // The transaction must be restarted on all participants because a new read timestamp will be // selected, so clear all pending participants. Snapshot errors are only retryable on the first // client statement, so all participants should be cleared, including the coordinator. _clearPendingParticipants(opCtx); - invariant(_participants.empty()); - invariant(!_coordinatorId); + invariant(o().participants.empty()); + invariant(!o().coordinatorId); + + stdx::lock_guard<Client> lk(*opCtx->getClient()); // Reset the global snapshot timestamp so the retry will select a new one. - _atClusterTime.reset(); - _atClusterTime.emplace(); + o(lk).atClusterTime.reset(); + o(lk).atClusterTime.emplace(); } -void TransactionRouter::setDefaultAtClusterTime(OperationContext* opCtx) { - if (!_atClusterTime || !_atClusterTime->canChange(_latestStmtId)) { +void TransactionRouter::Router::setDefaultAtClusterTime(OperationContext* opCtx) { + if (!o().atClusterTime || !o().atClusterTime->canChange(p().latestStmtId)) { return; } auto defaultTime = LogicalClock::get(opCtx)->getClusterTime(); - _setAtClusterTime(repl::ReadConcernArgs::get(opCtx).getArgsAfterClusterTime(), defaultTime); + _setAtClusterTime( + opCtx, repl::ReadConcernArgs::get(opCtx).getArgsAfterClusterTime(), defaultTime); } -void TransactionRouter::_setAtClusterTime(const boost::optional<LogicalTime>& afterClusterTime, - LogicalTime candidateTime) { +void TransactionRouter::Router::_setAtClusterTime( + OperationContext* opCtx, + const boost::optional<LogicalTime>& afterClusterTime, + LogicalTime candidateTime) { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + // If the user passed afterClusterTime, the chosen time must be greater than or equal to it. if (afterClusterTime && *afterClusterTime > candidateTime) { - _atClusterTime->setTime(*afterClusterTime, _latestStmtId); + o(lk).atClusterTime->setTime(*afterClusterTime, p().latestStmtId); return; } LOG(2) << txnIdToString() << " Setting global snapshot timestamp to " << candidateTime - << " on statement " << _latestStmtId; + << " on statement " << p().latestStmtId; - _atClusterTime->setTime(candidateTime, _latestStmtId); + o(lk).atClusterTime->setTime(candidateTime, p().latestStmtId); } -void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, - TxnNumber txnNumber, - TransactionActions action) { - if (txnNumber < _txnNumber) { +void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, + TxnNumber txnNumber, + TransactionActions action) { + if (txnNumber < o().txnNumber) { // This transaction is older than the transaction currently in progress, so throw an error. uasserted(ErrorCodes::TransactionTooOld, str::stream() << "txnNumber " << txnNumber << " is less than last txnNumber " - << _txnNumber + << o().txnNumber << " seen in session " << _sessionId()); - } else if (txnNumber == _txnNumber) { + } else if (txnNumber == o().txnNumber) { // This is the same transaction as the one in progress. switch (action) { case TransactionActions::kStart: { uasserted(ErrorCodes::ConflictingOperationInProgress, - str::stream() << "txnNumber " << _txnNumber << " for session " + str::stream() << "txnNumber " << o().txnNumber << " for session " << _sessionId() << " already started"); } @@ -662,15 +728,15 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, "Only the first command in a transaction may specify a readConcern", repl::ReadConcernArgs::get(opCtx).isEmpty()); - repl::ReadConcernArgs::get(opCtx) = _readConcernArgs; - ++_latestStmtId; + repl::ReadConcernArgs::get(opCtx) = o().readConcernArgs; + ++p().latestStmtId; return; } case TransactionActions::kCommit: - ++_latestStmtId; + ++p().latestStmtId; return; } - } else if (txnNumber > _txnNumber) { + } else if (txnNumber > o().txnNumber) { // This is a newer transaction. switch (action) { case TransactionActions::kStart: { @@ -682,12 +748,17 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, !readConcernArgs.hasLevel() || isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel())); - _resetRouterState(txnNumber); + _resetRouterState(opCtx, txnNumber); - _readConcernArgs = readConcernArgs; + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).readConcernArgs = readConcernArgs; + } - if (_readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) { - _atClusterTime.emplace(); + if (o().readConcernArgs.getLevel() == + repl::ReadConcernLevel::kSnapshotReadConcern) { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).atClusterTime.emplace(); } _onNewTransaction(opCtx); @@ -696,16 +767,17 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, } case TransactionActions::kContinue: { uasserted(ErrorCodes::NoSuchTransaction, - str::stream() << "cannot continue txnId " << _txnNumber << " for session " + str::stream() << "cannot continue txnId " << o().txnNumber + << " for session " << _sessionId() << " with txnId " << txnNumber); } case TransactionActions::kCommit: { - _resetRouterState(txnNumber); + _resetRouterState(opCtx, txnNumber); // If the first action seen by the router for this transaction is to commit, that // means that the client is attempting to recover a commit decision. - _isRecoveringCommit = true; + p().isRecoveringCommit = true; _onBeginRecoveringDecision(opCtx); LOG(3) << txnIdToString() << " Commit recovery started"; @@ -716,18 +788,18 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx, MONGO_UNREACHABLE; } -const LogicalSessionId& TransactionRouter::_sessionId() const { - const auto* owningSession = getTransactionRouter.owner(this); +const LogicalSessionId& TransactionRouter::Router::_sessionId() const { + const auto* owningSession = getTransactionRouter.owner(_tr); return owningSession->getSessionId(); } -BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx) { - invariant(_coordinatorId); - auto coordinatorIter = _participants.find(*_coordinatorId); - invariant(coordinatorIter != _participants.end()); +BSONObj TransactionRouter::Router::_handOffCommitToCoordinator(OperationContext* opCtx) { + invariant(o().coordinatorId); + auto coordinatorIter = o().participants.find(*o().coordinatorId); + invariant(coordinatorIter != o().participants.end()); std::vector<CommitParticipant> participantList; - for (const auto& participantEntry : _participants) { + for (const auto& participantEntry : o().participants) { CommitParticipant commitParticipant; commitParticipant.setShardId(participantEntry.first); participantList.push_back(std::move(commitParticipant)); @@ -740,13 +812,13 @@ BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx) BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON())); LOG(3) << txnIdToString() - << " Committing using two-phase commit, coordinator: " << *_coordinatorId; + << " Committing using two-phase commit, coordinator: " << *o().coordinatorId; MultiStatementTransactionRequestsSender ars( opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), NamespaceString::kAdminDb, - {{*_coordinatorId, coordinateCommitCmdObj}}, + {{*o().coordinatorId, coordinateCommitCmdObj}}, ReadPreferenceSetting{ReadPreference::PrimaryOnly}, Shard::RetryPolicy::kIdempotent); @@ -757,9 +829,9 @@ BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx) return response.swResponse.getValue().data; } -BSONObj TransactionRouter::commitTransaction( +BSONObj TransactionRouter::Router::commitTransaction( OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken) { - _terminationInitiated = true; + p().terminationInitiated = true; auto commitRes = _commitTransaction(opCtx, recoveryToken); @@ -786,32 +858,41 @@ BSONObj TransactionRouter::commitTransaction( return commitRes; } -BSONObj TransactionRouter::_commitTransaction( +BSONObj TransactionRouter::Router::_commitTransaction( OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken) { - if (_isRecoveringCommit) { + + if (p().isRecoveringCommit) { uassert(50940, "Cannot recover the transaction decision without a recoveryToken", recoveryToken); - _commitType = CommitType::kRecoverWithToken; - _onStartCommit(opCtx); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).commitType = CommitType::kRecoverWithToken; + _onStartCommit(lk, opCtx); + } + return _commitWithRecoveryToken(opCtx, *recoveryToken); } - if (_participants.empty()) { + if (o().participants.empty()) { // The participants list can be empty if a transaction was began on mongos, but it never // ended up targeting any hosts. Such cases are legal for example if a find is issued // against a non-existent database. uassert(ErrorCodes::IllegalOperation, "Cannot commit without participants", - _txnNumber != kUninitializedTxnNumber); - _commitType = CommitType::kNoShards; - _onStartCommit(opCtx); + o().txnNumber != kUninitializedTxnNumber); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).commitType = CommitType::kNoShards; + _onStartCommit(lk, opCtx); + } + return BSON("ok" << 1); } std::vector<ShardId> readOnlyShards; std::vector<ShardId> writeShards; - for (const auto& participant : _participants) { + for (const auto& participant : o().participants) { switch (participant.second.readOnly) { case Participant::ReadOnly::kUnset: uasserted(ErrorCodes::NoSuchTransaction, @@ -829,20 +910,29 @@ BSONObj TransactionRouter::_commitTransaction( } } - if (_participants.size() == 1) { - ShardId shardId = _participants.cbegin()->first; + if (o().participants.size() == 1) { + ShardId shardId = o().participants.cbegin()->first; LOG(3) << txnIdToString() << " Committing single-shard transaction, single participant: " << shardId; - _commitType = CommitType::kSingleShard; - _onStartCommit(opCtx); + + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).commitType = CommitType::kSingleShard; + _onStartCommit(lk, opCtx); + } + return sendCommitDirectlyToShards(opCtx, {shardId}); } if (writeShards.size() == 0) { LOG(3) << txnIdToString() << " Committing read-only transaction on " << readOnlyShards.size() << " shards"; - _commitType = CommitType::kReadOnly; - _onStartCommit(opCtx); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).commitType = CommitType::kReadOnly; + _onStartCommit(lk, opCtx); + } + return sendCommitDirectlyToShards(opCtx, readOnlyShards); } @@ -850,8 +940,12 @@ BSONObj TransactionRouter::_commitTransaction( LOG(3) << txnIdToString() << " Committing single-write-shard transaction with " << readOnlyShards.size() << " read-only shards, write shard: " << writeShards.front(); - _commitType = CommitType::kSingleWriteShard; - _onStartCommit(opCtx); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).commitType = CommitType::kSingleWriteShard; + _onStartCommit(lk, opCtx); + } + const auto readOnlyShardsResponse = sendCommitDirectlyToShards(opCtx, readOnlyShards); if (!getStatusFromCommandResult(readOnlyShardsResponse).isOK() || @@ -861,29 +955,34 @@ BSONObj TransactionRouter::_commitTransaction( return sendCommitDirectlyToShards(opCtx, writeShards); } - _commitType = CommitType::kTwoPhaseCommit; - _onStartCommit(opCtx); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).commitType = CommitType::kTwoPhaseCommit; + _onStartCommit(lk, opCtx); + } + return _handOffCommitToCoordinator(opCtx); } -BSONObj TransactionRouter::abortTransaction(OperationContext* opCtx) { +BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) { _onExplicitAbort(opCtx); // The router has yet to send any commands to a remote shard for this transaction. // Return the same error that would have been returned by a shard. uassert(ErrorCodes::NoSuchTransaction, "no known command has been sent by this router for this transaction", - !_participants.empty()); + !o().participants.empty()); - _terminationInitiated = true; + p().terminationInitiated = true; auto abortCmd = BSON("abortTransaction" << 1); std::vector<AsyncRequestsSender::Request> abortRequests; - for (const auto& participantEntry : _participants) { + for (const auto& participantEntry : o().participants) { abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd); } - LOG(3) << txnIdToString() << " Aborting transaction on " << _participants.size() << " shard(s)"; + LOG(3) << txnIdToString() << " Aborting transaction on " << o().participants.size() + << " shard(s)"; const auto responses = gatherResponses(opCtx, NamespaceString::kAdminDb, @@ -915,10 +1014,10 @@ BSONObj TransactionRouter::abortTransaction(OperationContext* opCtx) { return lastResult; } -void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx, - const Status& errorStatus) { - if (_commitType == CommitType::kTwoPhaseCommit || - _commitType == CommitType::kRecoverWithToken) { +void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opCtx, + const Status& errorStatus) { + if (o().commitType == CommitType::kTwoPhaseCommit || + o().commitType == CommitType::kRecoverWithToken) { LOG(3) << txnIdToString() << " Router not sending implicit abortTransaction because commit " "may have been handed off to the coordinator"; return; @@ -926,19 +1025,19 @@ void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx, _onImplicitAbort(opCtx, errorStatus); - if (_participants.empty()) { + if (o().participants.empty()) { return; } - _terminationInitiated = true; + p().terminationInitiated = true; auto abortCmd = BSON("abortTransaction" << 1); std::vector<AsyncRequestsSender::Request> abortRequests; - for (const auto& participantEntry : _participants) { + for (const auto& participantEntry : o().participants) { abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd); } - LOG(3) << txnIdToString() << " Implicitly aborting transaction on " << _participants.size() + LOG(3) << txnIdToString() << " Implicitly aborting transaction on " << o().participants.size() << " shard(s) due to error: " << errorStatus; try { @@ -955,48 +1054,50 @@ void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx, } } -std::string TransactionRouter::txnIdToString() const { - return str::stream() << _sessionId().getId() << ":" << _txnNumber; +std::string TransactionRouter::Router::txnIdToString() const { + return str::stream() << _sessionId().getId() << ":" << o().txnNumber; } -void TransactionRouter::appendRecoveryToken(BSONObjBuilder* builder) const { +void TransactionRouter::Router::appendRecoveryToken(BSONObjBuilder* builder) const { BSONObjBuilder recoveryTokenBuilder( builder->subobjStart(CommitTransaction::kRecoveryTokenFieldName)); TxnRecoveryToken recoveryToken; // The recovery shard is chosen on the first statement that did a write (transactions that only // did reads do not need to be recovered; they can just be retried). - if (_recoveryShardId) { - invariant(_participants.find(*_recoveryShardId)->second.readOnly == + if (p().recoveryShardId) { + invariant(o().participants.find(*p().recoveryShardId)->second.readOnly == Participant::ReadOnly::kNotReadOnly); - recoveryToken.setRecoveryShardId(*_recoveryShardId); + recoveryToken.setRecoveryShardId(*p().recoveryShardId); } recoveryToken.serialize(&recoveryTokenBuilder); recoveryTokenBuilder.doneFast(); } -void TransactionRouter::_resetRouterState(const TxnNumber& txnNumber) { - _txnNumber = txnNumber; - _commitType = CommitType::kNotInitiated; - _isRecoveringCommit = false; - _participants.clear(); - _coordinatorId.reset(); - _recoveryShardId.reset(); - _readConcernArgs = {}; - _atClusterTime.reset(); - _abortCause = std::string(); - _timingStats = TimingStats(); - _terminationInitiated = false; +void TransactionRouter::Router::_resetRouterState(OperationContext* opCtx, + const TxnNumber& txnNumber) { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).txnNumber = txnNumber; + o(lk).commitType = CommitType::kNotInitiated; + p().isRecoveringCommit = false; + o(lk).participants.clear(); + o(lk).coordinatorId.reset(); + p().recoveryShardId.reset(); + o(lk).readConcernArgs = {}; + o(lk).atClusterTime.reset(); + o(lk).abortCause = std::string(); + o(lk).timingStats = TimingStats(); + p().terminationInitiated = false; // TODO SERVER-37115: Parse statement ids from the client and remember the statement id // of the command that started the transaction, if one was included. - _latestStmtId = kDefaultFirstStmtId; - _firstStmtId = kDefaultFirstStmtId; + p().latestStmtId = kDefaultFirstStmtId; + p().firstStmtId = kDefaultFirstStmtId; }; -BSONObj TransactionRouter::_commitWithRecoveryToken(OperationContext* opCtx, - const TxnRecoveryToken& recoveryToken) { +BSONObj TransactionRouter::Router::_commitWithRecoveryToken(OperationContext* opCtx, + const TxnRecoveryToken& recoveryToken) { uassert(ErrorCodes::NoSuchTransaction, "Recovery token is empty, meaning the transaction only performed reads and can be " "safely retried", @@ -1026,13 +1127,13 @@ BSONObj TransactionRouter::_commitWithRecoveryToken(OperationContext* opCtx, .response; } -void TransactionRouter::_logSlowTransaction(OperationContext* opCtx, - TerminationCause terminationCause) const { +void TransactionRouter::Router::_logSlowTransaction(OperationContext* opCtx, + TerminationCause terminationCause) const { log() << "transaction " << _transactionInfoForLog(opCtx, terminationCause); } -std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx, - TerminationCause terminationCause) const { +std::string TransactionRouter::Router::_transactionInfoForLog( + OperationContext* opCtx, TerminationCause terminationCause) const { StringBuilder sb; BSONObjBuilder parametersBuilder; @@ -1041,24 +1142,24 @@ std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx, _sessionId().serialize(&lsidBuilder); lsidBuilder.doneFast(); - parametersBuilder.append("txnNumber", _txnNumber); + parametersBuilder.append("txnNumber", o().txnNumber); parametersBuilder.append("autocommit", false); - _readConcernArgs.appendInfo(¶metersBuilder); + o().readConcernArgs.appendInfo(¶metersBuilder); sb << "parameters:" << parametersBuilder.obj().toString() << ","; - if (_atClusterTime) { - sb << " globalReadTimestamp:" << _atClusterTime->getTime().toString() << ","; + if (o().atClusterTime) { + sb << " globalReadTimestamp:" << o().atClusterTime->getTime().toString() << ","; } - if (_commitType != CommitType::kRecoverWithToken) { + if (o().commitType != CommitType::kRecoverWithToken) { // We don't know the participants if we're recovering the commit. - sb << " numParticipants:" << _participants.size() << ","; + sb << " numParticipants:" << o().participants.size() << ","; } - if (_commitType == CommitType::kTwoPhaseCommit) { - invariant(_coordinatorId); - sb << " coordinator:" << *_coordinatorId << ","; + if (o().commitType == CommitType::kTwoPhaseCommit) { + invariant(o().coordinatorId); + sb << " coordinator:" << *o().coordinatorId << ","; } auto tickSource = opCtx->getServiceContext()->getTickSource(); @@ -1067,22 +1168,22 @@ std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx, if (terminationCause == TerminationCause::kCommitted) { sb << " terminationCause:committed,"; - invariant(_commitType != CommitType::kNotInitiated); - invariant(_abortCause.empty()); + invariant(o().commitType != CommitType::kNotInitiated); + invariant(o().abortCause.empty()); } else { sb << " terminationCause:aborted,"; - invariant(!_abortCause.empty()); - sb << " abortCause:" << _abortCause << ","; + invariant(!o().abortCause.empty()); + sb << " abortCause:" << o().abortCause << ","; // TODO SERVER-40985: Log abortSource } - if (_commitType != CommitType::kNotInitiated) { - sb << " commitType:" << _commitTypeToString(_commitType) << ","; + if (o().commitType != CommitType::kNotInitiated) { + sb << " commitType:" << commitTypeToString(o().commitType) << ","; sb << " commitDurationMicros:" - << durationCount<Microseconds>(_timingStats.getCommitDuration(tickSource, curTicks)) + << durationCount<Microseconds>(o().timingStats.getCommitDuration(tickSource, curTicks)) << ","; } @@ -1092,29 +1193,36 @@ std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx, // Total duration of the transaction. Logged at the end of the line for consistency with slow // command logging. - sb << " " << duration_cast<Milliseconds>(_timingStats.getDuration(tickSource, curTicks)); + sb << " " << duration_cast<Milliseconds>(o().timingStats.getDuration(tickSource, curTicks)); return sb.str(); } -void TransactionRouter::_onNewTransaction(OperationContext* opCtx) { +void TransactionRouter::Router::_onNewTransaction(OperationContext* opCtx) { auto tickSource = opCtx->getServiceContext()->getTickSource(); - _timingStats.startTime = tickSource->getTicks(); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).timingStats.startTime = tickSource->getTicks(); + } auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); routerTxnMetrics->incrementTotalStarted(); } -void TransactionRouter::_onBeginRecoveringDecision(OperationContext* opCtx) { +void TransactionRouter::Router::_onBeginRecoveringDecision(OperationContext* opCtx) { auto tickSource = opCtx->getServiceContext()->getTickSource(); - _timingStats.startTime = tickSource->getTicks(); + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).timingStats.startTime = tickSource->getTicks(); + } auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); routerTxnMetrics->incrementTotalStarted(); } -void TransactionRouter::_onImplicitAbort(OperationContext* opCtx, const Status& errorStatus) { - if (_commitType != CommitType::kNotInitiated && _timingStats.endTime == 0) { +void TransactionRouter::Router::_onImplicitAbort(OperationContext* opCtx, + const Status& errorStatus) { + if (o().commitType != CommitType::kNotInitiated && o().timingStats.endTime == 0) { // If commit was started but an end time wasn't set, then we don't know the commit result // and can't consider the transaction over until the client retries commit and definitively // learns the result. Note that this behavior may lead to no logging in some cases, but @@ -1124,58 +1232,62 @@ void TransactionRouter::_onImplicitAbort(OperationContext* opCtx, const Status& // Implicit abort may execute multiple times if a misbehaving client keeps sending statements // for a txnNumber after receiving an error, so only remember the first abort cause. - if (_abortCause.empty()) { - _abortCause = errorStatus.codeString(); + if (o().abortCause.empty()) { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).abortCause = errorStatus.codeString(); } _endTransactionTrackingIfNecessary(opCtx, TerminationCause::kAborted); } -void TransactionRouter::_onExplicitAbort(OperationContext* opCtx) { +void TransactionRouter::Router::_onExplicitAbort(OperationContext* opCtx) { // A behaving client should never try to commit after attempting to abort, so we can consider // the transaction terminated as soon as explicit abort is observed. - if (_abortCause.empty()) { + if (o().abortCause.empty()) { // Note this code means the abort was from a user abortTransaction command. - _abortCause = "abort"; + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).abortCause = "abort"; } _endTransactionTrackingIfNecessary(opCtx, TerminationCause::kAborted); } -void TransactionRouter::_onStartCommit(OperationContext* opCtx) { - invariant(_commitType != CommitType::kNotInitiated); +void TransactionRouter::Router::_onStartCommit(WithLock wl, OperationContext* opCtx) { + invariant(o().commitType != CommitType::kNotInitiated); - if (_timingStats.commitStartTime != 0) { + if (o().timingStats.commitStartTime != 0) { return; } auto tickSource = opCtx->getServiceContext()->getTickSource(); - _timingStats.commitStartTime = tickSource->getTicks(); + o(wl).timingStats.commitStartTime = tickSource->getTicks(); auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); - routerTxnMetrics->incrementCommitInitiated(_commitType); - if (_commitType != CommitType::kRecoverWithToken) { + routerTxnMetrics->incrementCommitInitiated(o().commitType); + if (o().commitType != CommitType::kRecoverWithToken) { // We only know the participant list if we're not recovering a decision. - routerTxnMetrics->addToTotalParticipantsAtCommit(_participants.size()); + routerTxnMetrics->addToTotalParticipantsAtCommit(o().participants.size()); } } -void TransactionRouter::_onNonRetryableCommitError(OperationContext* opCtx, Status commitStatus) { +void TransactionRouter::Router::_onNonRetryableCommitError(OperationContext* opCtx, + Status commitStatus) { // If the commit failed with a command error that can't be retried on, the transaction shouldn't // be able to eventually commit, so it can be considered over from the router's perspective. - if (_abortCause.empty()) { - _abortCause = commitStatus.codeString(); + if (o().abortCause.empty()) { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).abortCause = commitStatus.codeString(); } _endTransactionTrackingIfNecessary(opCtx, TerminationCause::kAborted); } -void TransactionRouter::_onSuccessfulCommit(OperationContext* opCtx) { +void TransactionRouter::Router::_onSuccessfulCommit(OperationContext* opCtx) { _endTransactionTrackingIfNecessary(opCtx, TerminationCause::kCommitted); } -void TransactionRouter::_endTransactionTrackingIfNecessary(OperationContext* opCtx, - TerminationCause terminationCause) { - if (_timingStats.endTime != 0) { +void TransactionRouter::Router::_endTransactionTrackingIfNecessary( + OperationContext* opCtx, TerminationCause terminationCause) { + if (o().timingStats.endTime != 0) { // If the transaction was already ended, don't end it again. return; } @@ -1183,21 +1295,25 @@ void TransactionRouter::_endTransactionTrackingIfNecessary(OperationContext* opC auto tickSource = opCtx->getServiceContext()->getTickSource(); auto curTicks = tickSource->getTicks(); - _timingStats.endTime = curTicks; + { + stdx::lock_guard<Client> lk(*opCtx->getClient()); + o(lk).timingStats.endTime = curTicks; + } if (shouldLog(logger::LogComponent::kTransaction, logger::LogSeverity::Debug(1)) || - _timingStats.getDuration(tickSource, curTicks) > Milliseconds(serverGlobalParams.slowMS)) { + o().timingStats.getDuration(tickSource, curTicks) > + Milliseconds(serverGlobalParams.slowMS)) { _logSlowTransaction(opCtx, terminationCause); } auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx); if (terminationCause == TerminationCause::kAborted) { routerTxnMetrics->incrementTotalAborted(); - routerTxnMetrics->incrementAbortCauseMap(_abortCause); + routerTxnMetrics->incrementAbortCauseMap(o().abortCause); } else { routerTxnMetrics->incrementTotalCommitted(); routerTxnMetrics->incrementCommitSuccessful( - _commitType, _timingStats.getCommitDuration(tickSource, curTicks)); + o().commitType, o().timingStats.getCommitDuration(tickSource, curTicks)); } } diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index 1577880a92c..4d442f3a225 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -47,7 +47,17 @@ namespace mongo { * Keeps track of the transaction state. A session is in use when it is being used by a request. */ class TransactionRouter { + struct PrivateState; + struct ObservableState; + public: + TransactionRouter(); + TransactionRouter(const TransactionRouter&) = delete; + TransactionRouter& operator=(const TransactionRouter&) = delete; + TransactionRouter(TransactionRouter&&) = delete; + TransactionRouter& operator=(TransactionRouter&&) = delete; + ~TransactionRouter(); + // The type of commit initiated for this transaction. enum class CommitType { kNotInitiated, @@ -86,6 +96,7 @@ public: Participant(bool isCoordinator, StmtId stmtIdCreatedAt, + ReadOnly inReadOnly, SharedTransactionOptions sharedOptions); /** @@ -98,13 +109,13 @@ public: // Is updated to kReadOnly or kNotReadOnly based on the readOnly field in the participant's // responses to statements. - ReadOnly readOnly{ReadOnly::kUnset}; - - // The highest statement id of the request during which this participant was created. - const StmtId stmtIdCreatedAt; + const ReadOnly readOnly{ReadOnly::kUnset}; // Returns the shared transaction options this participant was created with const SharedTransactionOptions sharedOptions; + + // The highest statement id of the request during which this participant was created. + const StmtId stmtIdCreatedAt; }; // Container for timing stats for the current transaction. Includes helpers for calculating some @@ -174,368 +185,418 @@ public: LogicalTime _atClusterTime; }; - TransactionRouter(); - TransactionRouter(const TransactionRouter&) = delete; - TransactionRouter& operator=(const TransactionRouter&) = delete; - TransactionRouter(TransactionRouter&&) = delete; - TransactionRouter& operator=(TransactionRouter&&) = delete; - ~TransactionRouter(); - /** - * Extract the runtime state attached to the operation context. Returns nullptr if none is - * attached. + * Class used by observers to examine the state of a TransactionRouter. */ - static TransactionRouter* get(OperationContext* opCtx); - static TransactionRouter* get(const ObservableSession& osession); + class Observer { + public: + explicit Observer(const ObservableSession& session); - /** - * Starts a fresh transaction in this session or continue an existing one. Also cleans up the - * previous transaction state. - */ - void beginOrContinueTxn(OperationContext* opCtx, - TxnNumber txnNumber, - TransactionActions action); + protected: + explicit Observer(TransactionRouter* tr) : _tr(tr) {} - /** - * Attaches the required transaction related fields for a request to be sent to the given - * shard. - * - * Calling this method has the following side effects: - * 1. Potentially selecting a coordinator. - * 2. Adding the shard to the list of participants. - * 3. Also append fields for first statements (ex. startTransaction, readConcern) - * if the shard was newly added to the list of participants. - */ - BSONObj attachTxnFieldsIfNeeded(OperationContext* opCtx, - const ShardId& shardId, - const BSONObj& cmdObj); + const TransactionRouter::ObservableState& o() const { + return _tr->_o; + } - /** - * Processes the transaction metadata in the response from the participant if the response - * indicates the operation succeeded. - */ - void processParticipantResponse(const ShardId& shardId, const BSONObj& responseObj); + TransactionRouter* _tr; + }; // class Observer /** - * Returns true if the current transaction can retry on a stale version error from a contacted - * shard. This is always true except for an error received by a write that is not the first - * overall statement in the sharded transaction. This is because the entire command will be - * retried, and shards that were not stale and are targeted again may incorrectly execute the - * command a second time. - * - * Note: Even if this method returns true, the retry attempt may still fail, e.g. if one of the - * shards that returned a stale version error was involved in a previously completed a statement - * for this transaction. - * - * TODO SERVER-37207: Change batch writes to retry only the failed writes in a batch, to allow - * retrying writes beyond the first overall statement. + * Class used by a thread that has checked out the TransactionRouter's session to observe + * and modify the transaction router. */ - bool canContinueOnStaleShardOrDbError(StringData cmdName) const; + class Router : public Observer { + public: + explicit Router(OperationContext* opCtx); - /** - * Updates the transaction state to allow for a retry of the current command on a stale version - * error. This includes sending abortTransaction to all cleared participants. Will throw if the - * transaction cannot be continued. - */ - void onStaleShardOrDbError(OperationContext* opCtx, - StringData cmdName, - const Status& errorStatus); + explicit operator bool() const { + return _tr; + } - /** - * Returns true if the current transaction can retry on a snapshot error. This is only true on - * the first command recevied for a transaction. - */ - bool canContinueOnSnapshotError() const; + /** + * Starts a fresh transaction in this session or continue an existing one. Also cleans up the + * previous transaction state. + */ + void beginOrContinueTxn(OperationContext* opCtx, + TxnNumber txnNumber, + TransactionActions action); - /** - * Resets the transaction state to allow for a retry attempt. This includes clearing all - * participants, clearing the coordinator, resetting the global read timestamp, and sending - * abortTransaction to all cleared participants. Will throw if the transaction cannot be - * continued. - */ - void onSnapshotError(OperationContext* opCtx, const Status& errorStatus); + /** + * Attaches the required transaction related fields for a request to be sent to the given + * shard. + * + * Calling this method has the following side effects: + * 1. Potentially selecting a coordinator. + * 2. Adding the shard to the list of participants. + * 3. Also append fields for first statements (ex. startTransaction, readConcern) + * if the shard was newly added to the list of participants. + */ + BSONObj attachTxnFieldsIfNeeded(OperationContext* opCtx, + const ShardId& shardId, + const BSONObj& cmdObj); - /** - * Updates the transaction tracking state to allow for a retry attempt on a view resolution - * error. This includes sending abortTransaction to all cleared participants. - */ - void onViewResolutionError(OperationContext* opCtx, const NamespaceString& nss); + /** + * Processes the transaction metadata in the response from the participant if the response + * indicates the operation succeeded. + */ + void processParticipantResponse(OperationContext* opCtx, + const ShardId& shardId, + const BSONObj& responseObj); - /** - * Sets the atClusterTime for the current transaction to the latest time in the router's logical - * clock. Does nothing if the transaction does not have snapshot read concern or an - * atClusterTime has already been selected and cannot be changed. - */ - void setDefaultAtClusterTime(OperationContext* opCtx); + /** + * Returns true if the current transaction can retry on a stale version error from a + * contacted shard. This is always true except for an error received by a write that is not + * the first overall statement in the sharded transaction. This is because the entire + * command will be retried, and shards that were not stale and are targeted again may + * incorrectly execute the command a second time. + * + * Note: Even if this method returns true, the retry attempt may still fail, e.g. if one of + * the shards that returned a stale version error was involved in a previously completed a + * statement for this transaction. + * + * TODO SERVER-37207: Change batch writes to retry only the failed writes in a batch, to + * allow retrying writes beyond the first overall statement. + */ + bool canContinueOnStaleShardOrDbError(StringData cmdName) const; - /** - * Returns the global read timestamp for this transaction. Returns boost::none for transactions - * that don't run at snapshot level read concern or if a timestamp has not yet been selected. - */ - const boost::optional<AtClusterTime>& getAtClusterTime() const; + /** + * Updates the transaction state to allow for a retry of the current command on a stale + * version error. This includes sending abortTransaction to all cleared participants. Will + * throw if the transaction cannot be continued. + */ + void onStaleShardOrDbError(OperationContext* opCtx, + StringData cmdName, + const Status& errorStatus); - /** - * If a coordinator has been selected for the current transaction, returns its id. - */ - const boost::optional<ShardId>& getCoordinatorId() const; + /** + * Returns true if the current transaction can retry on a snapshot error. This is only true + * on the first command recevied for a transaction. + */ + bool canContinueOnSnapshotError() const; - /** - * If a recovery shard has been selected for the current transaction, returns its id. - */ - const boost::optional<ShardId>& getRecoveryShardId() const; + /** + * Resets the transaction state to allow for a retry attempt. This includes clearing all + * participants, clearing the coordinator, resetting the global read timestamp, and sending + * abortTransaction to all cleared participants. Will throw if the transaction cannot be + * continued. + */ + void onSnapshotError(OperationContext* opCtx, const Status& errorStatus); - /** - * Commits the transaction. - * - * For transactions that only did reads or only wrote to one shard, sends commit directly to the - * participants and returns the first error response or the last (success) response. - * - * For transactions that performed writes to multiple shards, hands off the participant list to - * the coordinator to do two-phase commit, and returns the coordinator's response. - */ - BSONObj commitTransaction(OperationContext* opCtx, - const boost::optional<TxnRecoveryToken>& recoveryToken); + /** + * Updates the transaction tracking state to allow for a retry attempt on a view resolution + * error. This includes sending abortTransaction to all cleared participants. + */ + void onViewResolutionError(OperationContext* opCtx, const NamespaceString& nss); - /** - * Sends abort to all participants. - * - * Returns the first error response or the last (success) response. - */ - BSONObj abortTransaction(OperationContext* opCtx); + /** + * Returns true if the associated transaction is running at snapshot level read concern. + */ + bool mustUseAtClusterTime() const; - /** - * Sends abort to all shards in the current participant list. Will retry on retryable errors, - * but ignores the responses from each shard. - */ - void implicitlyAbortTransaction(OperationContext* opCtx, const Status& errorStatus); + /** + * Returns the read timestamp for this transaction. Callers must verify that the read + * timestamp has been selected for this transaction before calling this function. + */ + LogicalTime getSelectedAtClusterTime() const; - /** - * Returns the participant for this transaction or nullptr if the specified shard is not - * participant of this transaction. - */ - Participant* getParticipant(const ShardId& shard); + /** + * Sets the atClusterTime for the current transaction to the latest time in the router's + * logical clock. Does nothing if the transaction does not have snapshot read concern or an + * atClusterTime has already been selected and cannot be changed. + */ + void setDefaultAtClusterTime(OperationContext* opCtx); - /** - * If a coordinator has been selected for this transaction already, constructs a recovery token, - * which can be used to resume commit or abort of the transaction from a different router. - */ - void appendRecoveryToken(BSONObjBuilder* builder) const; + /** + * If a coordinator has been selected for the current transaction, returns its id. + */ + const boost::optional<ShardId>& getCoordinatorId() const; - /** - * Returns a string with the active transaction's transaction number and logical session id - * (i.e. the transaction id). - */ - std::string txnIdToString() const; + /** + * If a recovery shard has been selected for the current transaction, returns its id. + */ + const boost::optional<ShardId>& getRecoveryShardId() const; - /** - * Returns the statement id of the latest received command for this transaction. - */ - StmtId getLatestStmtId() const { - return _latestStmtId; - } + /** + * Commits the transaction. + * + * For transactions that only did reads or only wrote to one shard, sends commit directly to + * the participants and returns the first error response or the last (success) response. + * + * For transactions that performed writes to multiple shards, hands off the participant list + * to the coordinator to do two-phase commit, and returns the coordinator's response. + */ + BSONObj commitTransaction(OperationContext* opCtx, + const boost::optional<TxnRecoveryToken>& recoveryToken); - /** - * Returns a copy of the timing stats of the transaction router's active transaction. - */ - TimingStats getTimingStats() const { - return _timingStats; - } + /** + * Sends abort to all participants. + * + * Returns the first error response or the last (success) response. + */ + BSONObj abortTransaction(OperationContext* opCtx); -private: - // Helper to convert the CommitType enum into a human readable string for diagnostics. - std::string _commitTypeToString(CommitType state) const { - switch (state) { - case CommitType::kNotInitiated: - return "notInitiated"; - case CommitType::kNoShards: - return "noShards"; - case CommitType::kSingleShard: - return "singleShard"; - case CommitType::kSingleWriteShard: - return "singleWriteShard"; - case CommitType::kReadOnly: - return "readOnly"; - case CommitType::kTwoPhaseCommit: - return "twoPhaseCommit"; - case CommitType::kRecoverWithToken: - return "recoverWithToken"; + /** + * Sends abort to all shards in the current participant list. Will retry on retryable errors, + * but ignores the responses from each shard. + */ + void implicitlyAbortTransaction(OperationContext* opCtx, const Status& errorStatus); + + /** + * If a coordinator has been selected for this transaction already, constructs a recovery + * token, which can be used to resume commit or abort of the transaction from a different + * router. + */ + void appendRecoveryToken(BSONObjBuilder* builder) const; + + /** + * Returns a string with the active transaction's transaction number and logical session id + * (i.e. the transaction id). + */ + std::string txnIdToString() const; + + /** + * Returns the participant for this transaction or nullptr if the specified shard is not + * participant of this transaction. + */ + const Participant* getParticipant(const ShardId& shard); + + /** + * Returns the statement id of the latest received command for this transaction. + */ + StmtId getLatestStmtId() const { + return p().latestStmtId; } - MONGO_UNREACHABLE; - } - /** - * Prints slow transaction information to the log. - */ - void _logSlowTransaction(OperationContext* opCtx, TerminationCause terminationCause) const; + /** + * Returns a copy of the timing stats of the transaction router's active transaction. + */ + const TimingStats& getTimingStats() const { + return o().timingStats; + } - /** - * Returns a string to be logged for slow transactions. - */ - std::string _transactionInfoForLog(OperationContext* opCtx, - TerminationCause terminationCause) const; + private: + /** + * Resets the router's state. Used when the router sees a new transaction for the first time. + * This is required because we don't create a new router object for each transaction, but + * instead reuse the same object across different transactions. + */ + void _resetRouterState(OperationContext* opCtx, const TxnNumber& txnNumber); - // Shortcut to obtain the id of the session under which this transaction router runs - const LogicalSessionId& _sessionId() const; + /** + * Internal method for committing a transaction. Should only throw on failure to send commit. + */ + BSONObj _commitTransaction(OperationContext* opCtx, + const boost::optional<TxnRecoveryToken>& recoveryToken); - /** - * Resets the router's state. Used when the router sees a new transaction for the first time. - * This is required because we don't create a new router object for each transaction, but - * instead reuse the same object across different transactions. - */ - void _resetRouterState(const TxnNumber& txnNumber); + /** + * Retrieves the transaction's outcome from the shard specified in the recovery token. + */ + BSONObj _commitWithRecoveryToken(OperationContext* opCtx, + const TxnRecoveryToken& recoveryToken); - /** - * Internal method for committing a transaction. Should only throw on failure to send commit. - */ - BSONObj _commitTransaction(OperationContext* opCtx, - const boost::optional<TxnRecoveryToken>& recoveryToken); + /** + * Hands off coordinating a two-phase commit across all participants to the coordinator + * shard. + */ + BSONObj _handOffCommitToCoordinator(OperationContext* opCtx); - /** - * Retrieves the transaction's outcome from the shard specified in the recovery token. - */ - BSONObj _commitWithRecoveryToken(OperationContext* opCtx, - const TxnRecoveryToken& recoveryToken); + /** + * Sets the given logical time as the atClusterTime for the transaction to be the greater of + * the given time and the user's afterClusterTime, if one was provided. + */ + void _setAtClusterTime(OperationContext* opCtx, + const boost::optional<LogicalTime>& afterClusterTime, + LogicalTime candidateTime); - /** - * Hands off coordinating a two-phase commit across all participants to the coordinator shard. - */ - BSONObj _handOffCommitToCoordinator(OperationContext* opCtx); + /** + * Throws NoSuchTransaction if the response from abortTransaction failed with a code other + * than NoSuchTransaction. Does not check for write concern errors. + */ + void _assertAbortStatusIsOkOrNoSuchTransaction( + const AsyncRequestsSender::Response& response) const; - /** - * Sets the given logical time as the atClusterTime for the transaction to be the greater of the - * given time and the user's afterClusterTime, if one was provided. - */ - void _setAtClusterTime(const boost::optional<LogicalTime>& afterClusterTime, - LogicalTime candidateTime); + /** + * If the transaction's read concern level is snapshot, asserts the participant's + * atClusterTime matches the transaction's. + */ + void _verifyParticipantAtClusterTime(const Participant& participant); - /** - * Throws NoSuchTransaction if the response from abortTransaction failed with a code other than - * NoSuchTransaction. Does not check for write concern errors. - */ - void _assertAbortStatusIsOkOrNoSuchTransaction( - const AsyncRequestsSender::Response& response) const; + /** + * Removes all participants created during the current statement from the participant list + * and sends abortTransaction to each. Waits for all responses before returning. + */ + void _clearPendingParticipants(OperationContext* opCtx); - /** - * Returns all participants created during the current statement. - */ - std::vector<ShardId> _getPendingParticipants() const; + /** + * Creates a new participant for the shard. + */ + TransactionRouter::Participant& _createParticipant(OperationContext* opCtx, + const ShardId& shard); - /** - * Removes all participants created during the current statement from the participant list and - * sends abortTransaction to each. Waits for all responses before returning. - */ - void _clearPendingParticipants(OperationContext* opCtx); + /** + * Sets the new readOnly value for the current participant on the shard. + */ + void _setReadOnlyForParticipant(OperationContext* opCtx, + const ShardId& shard, + const Participant::ReadOnly readOnly); - /** - * Creates a new participant for the shard. - */ - Participant& _createParticipant(const ShardId& shard); + /** + * Updates relevant metrics when a new transaction is begun. + */ + void _onNewTransaction(OperationContext* opCtx); - /** - * If the transaction's read concern level is snapshot, asserts the participant's atClusterTime - * matches the transaction's. - */ - void _verifyParticipantAtClusterTime(const Participant& participant); + /** + * Updates relevant metrics when a router receives commit for a higher txnNumber than it has + * seen so far. + */ + void _onBeginRecoveringDecision(OperationContext* opCtx); - /** - * Updates relevant metrics when a new transaction is begun. - */ - void _onNewTransaction(OperationContext* opCtx); + /** + * Updates relevant metrics when the router receives an explicit abort from the client. + */ + void _onExplicitAbort(OperationContext* opCtx); - /** - * Updates relevant metrics when a router receives commit for a higher txnNumber than it has - * seen so far. - */ - void _onBeginRecoveringDecision(OperationContext* opCtx); + /** + * Updates relevant metrics when the router begins an implicit abort after an error. + */ + void _onImplicitAbort(OperationContext* opCtx, const Status& errorStatus); - /** - * Updates relevant metrics when the router receives an explicit abort from the client. - */ - void _onExplicitAbort(OperationContext* opCtx); + /** + * Updates relevant metrics when a transaction is about to begin commit. + */ + void _onStartCommit(WithLock wl, OperationContext* opCtx); - /** - * Updates relevant metrics when the router begins an implicit abort after an error. - */ - void _onImplicitAbort(OperationContext* opCtx, const Status& errorStatus); + /** + * Updates relevant metrics when a transaction receives a successful response for commit. + */ + void _onSuccessfulCommit(OperationContext* opCtx); - /** - * Updates relevant metrics when a transaction is about to begin commit. - */ - void _onStartCommit(OperationContext* opCtx); + /** + * Updates relevant metrics when commit receives a response with a non-retryable command + * error per the retryable writes specification. + */ + void _onNonRetryableCommitError(OperationContext* opCtx, Status commitStatus); - /** - * Updates relevant metrics when a transaction receives a successful response for commit. - */ - void _onSuccessfulCommit(OperationContext* opCtx); + /** + * The first time this method is called it marks the transaction as over in the router's + * diagnostics and will log transaction information if its duration is over the global slowMS + * threshold or the transaction log componenet verbosity >= 1. Only meant to be called when + * the router definitively knows the transaction's outcome, e.g. it should not be invoked + * after a network error on commit. + */ + void _endTransactionTrackingIfNecessary(OperationContext* opCtx, + TerminationCause terminationCause); - /** - * Updates relevant metrics when commit receives a response with a non-retryable command error - * per the retryable writes specification. - */ - void _onNonRetryableCommitError(OperationContext* opCtx, Status commitStatus); + /** + * Returns all participants created during the current statement. + */ + std::vector<ShardId> _getPendingParticipants() const; + + /** + * Prints slow transaction information to the log. + */ + void _logSlowTransaction(OperationContext* opCtx, TerminationCause terminationCause) const; + + /** + * Returns a string to be logged for slow transactions. + */ + std::string _transactionInfoForLog(OperationContext* opCtx, + TerminationCause terminationCause) const; + + // Shortcut to obtain the id of the session under which this transaction router runs + const LogicalSessionId& _sessionId() const; + + TransactionRouter::PrivateState& p() { + return _tr->_p; + } + const TransactionRouter::PrivateState& p() const { + return _tr->_p; + } + TransactionRouter::ObservableState& o(WithLock) { + return _tr->_o; + } + using Observer::o; + }; // class Router + + static Router get(OperationContext* opCtx) { + return Router(opCtx); + } + + static Observer get(const ObservableSession& osession) { + return Observer(osession); + } + +private: /** - * The first time this method is called it marks the transaction as over in the router's - * diagnostics and will log transaction information if its duration is over the global slowMS - * threshold or the transaction log componenet verbosity >= 1. Only meant to be called when the - * router definitively knows the transaction's outcome, e.g. it should not be invoked after a - * network error on commit. + * State in this struct may be read by methods of Observer or Router, and may be written by + * methods of Router when they acquire the lock on the opCtx's Client. Access this inside + * Observer and Router using the private o() method for reading and (Router only) the + * o(WithLock) method for writing. */ - void _endTransactionTrackingIfNecessary(OperationContext* opCtx, - TerminationCause terminationCause); - - // The currently active transaction number on this router, if beginOrContinueTxn has been - // called. Otherwise set to kUninitializedTxnNumber. - TxnNumber _txnNumber{kUninitializedTxnNumber}; + struct ObservableState { + // The currently active transaction number on this router, if beginOrContinueTxn has been + // called. Otherwise set to kUninitializedTxnNumber. + TxnNumber txnNumber{kUninitializedTxnNumber}; - // Is updated at commit time to reflect which commit path was taken. - CommitType _commitType{CommitType::kNotInitiated}; + // Is updated at commit time to reflect which commit path was taken. + CommitType commitType{CommitType::kNotInitiated}; - // Indicates whether this is trying to recover a commitTransaction on the current transaction. - bool _isRecoveringCommit{false}; + // Map of current participants of the current transaction. + StringMap<Participant> participants; - // Map of current participants of the current transaction. - StringMap<Participant> _participants; + // The id of participant chosen as the two-phase commit coordinator. If, at commit time, + // two-phase commit is required, the participant list is handed off to this shard. Is unset + // until the transaction has targeted a participant, and is set to the first participant + // targeted. Is reset if the first participant targeted returns a "needs retargeting" error. + boost::optional<ShardId> coordinatorId; - // The id of participant chosen as the two-phase commit coordinator. If, at commit time, - // two-phase commit is required, the participant list is handed off to this shard. Is unset - // until the transaction has targeted a participant, and is set to the first participant - // targeted. Is reset if the first participant targeted returns a "needs retargeting" error. - boost::optional<ShardId> _coordinatorId; + // The read concern the current transaction was started with. + repl::ReadConcernArgs readConcernArgs; - // The id of the recovery participant. Passed in the recoveryToken that is included on responses - // to the client. Is unset until the transaction has done a write, and is set to the first - // participant that reports having done a write. Is reset if that participant is removed from - // the participant list because another participant targeted in the same statement returned a - // "needs retargeting" error. - boost::optional<ShardId> _recoveryShardId; + // The cluster time of the timestamp all participant shards in the current transaction with + // snapshot level read concern must read from. Only set for transactions running with + // snapshot level read concern. + boost::optional<AtClusterTime> atClusterTime; - // The read concern the current transaction was started with. - repl::ReadConcernArgs _readConcernArgs; + // String representing the reason a transaction aborted. Either the string name of the error + // code that led to an implicit abort or "abort" if the client sent abortTransaction. + std::string abortCause; - // The cluster time of the timestamp all participant shards in the current transaction with - // snapshot level read concern must read from. Only set for transactions running with snapshot - // level read concern. - boost::optional<AtClusterTime> _atClusterTime; + // Stats used for calculating durations for the active transaction. + TimingStats timingStats; + } _o; - // The statement id of the latest received command for this transaction. For batch writes, this - // will be the highest stmtId contained in the batch. Incremented by one if new commands do not - // contain statement ids. - StmtId _latestStmtId{kDefaultFirstStmtId}; + /** + * State in this struct may be read and written by methods of the Router, only. It may + * access the struct via the private p() accessor. No further locking is required in methods of + * the Router. + */ + struct PrivateState { + // Indicates whether this is trying to recover a commitTransaction on the current + // transaction. + bool isRecoveringCommit{false}; - // The statement id of the command that began this transaction. Defaults to zero if no statement - // id was included in the first command. - StmtId _firstStmtId{kDefaultFirstStmtId}; + // The id of the recovery participant. Passed in the recoveryToken that is included on + // responses to the client. Is unset until the transaction has done a write, and is set to + // the first participant that reports having done a write. Is reset if that participant is + // removed from the participant list because another participant targeted in the same + // statement returned a "needs retargeting" error. + boost::optional<ShardId> recoveryShardId; - // String representing the reason a transaction aborted. Either the string name of the error - // code that led to an implicit abort or "abort" if the client sent abortTransaction. - std::string _abortCause; + // The statement id of the latest received command for this transaction. For batch writes, + // this will be the highest stmtId contained in the batch. Incremented by one if new + // commands do not contain statement ids. + StmtId latestStmtId{kDefaultFirstStmtId}; - // Stats used for calculating durations for the active transaction. - TimingStats _timingStats; + // The statement id of the command that began this transaction. Defaults to zero if no + // statement id was included in the first command. + StmtId firstStmtId{kDefaultFirstStmtId}; - // Track whether commit or abort have been initiated. - bool _terminationInitiated{false}; + // Track whether commit or abort have been initiated. + bool terminationInitiated{false}; + } _p; }; } // namespace mongo diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index e3e083797d4..64f662e2b01 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -222,7 +222,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartTxnShouldBeAttachedOnlyOnFirstStatementToParticipant) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -271,7 +271,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -320,7 +320,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime) TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue), @@ -331,7 +331,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting) TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndReadConcern) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -416,7 +416,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -477,7 +477,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -519,7 +519,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) { TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForReadOnlyTransaction) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -532,12 +532,12 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForRea ASSERT_FALSE(txnRouter.getRecoveryShardId()); // The recovery shard is not set if a participant responds with ok but that it is read-only. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); ASSERT_FALSE(txnRouter.getRecoveryShardId()); // The recovery shard is not set even if more read-only participants respond. txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter.processParticipantResponse(shard2, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse); ASSERT_FALSE(txnRouter.getRecoveryShardId()); txnRouter.beginOrContinueTxn( @@ -561,13 +561,13 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsSetToSingleParticipantIfSingleParticipantDoesWriteOnFirstStatement) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT(txnRouter.getRecoveryShardId()); ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1); } @@ -576,7 +576,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsSetToSingleParticipantIfSingleParticipantDoesWriteOnLaterStatement) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -584,11 +584,11 @@ TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); // Response to first statement says read-only. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); ASSERT_FALSE(txnRouter.getRecoveryShardId()); // Response to second statement says not read-only. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT(txnRouter.getRecoveryShardId()); ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1); } @@ -597,19 +597,19 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsSetToSecondParticipantIfSecondParticipantIsFirstToDoAWrite) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); // Shard1's response says read-only. txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); ASSERT_FALSE(txnRouter.getRecoveryShardId()); // Shard2's response says not read-only. txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); ASSERT(txnRouter.getRecoveryShardId()); ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard2); } @@ -618,14 +618,14 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetIfRecoveryParticipantIsPendingAndPendingParticipantsAreCleared) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); // Shard1's response says not read-only. txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT(txnRouter.getRecoveryShardId()); ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1); @@ -642,14 +642,14 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsNotResetIfRecoveryParticipantIsNotPendingAndPendingParticipantsAreCleared) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); // Shard1's response says not read-only. txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT(txnRouter.getRecoveryShardId()); ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1); @@ -661,7 +661,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, // Shard2 responds, it doesn't matter whether it's read-only, just that it's a pending // participant. txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); ASSERT(txnRouter.getRecoveryShardId()); ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1); @@ -678,14 +678,14 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNewTransaction) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); // Shard1's response says not read-only. txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT(txnRouter.getRecoveryShardId()); ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1); @@ -700,7 +700,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNe TEST_F(TransactionRouterTestWithDefaultSession, DoesNotAttachTxnNumIfAlreadyThere) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -735,7 +735,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, "invariant") { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -751,7 +751,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfAlreadyOnCmd) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -786,7 +786,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfA TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFirstStatement) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -803,7 +803,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughNoReadConcernToPart TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -834,7 +834,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -864,7 +864,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedReadConcernLeve repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(readConcernLevel); TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart), @@ -879,7 +879,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter repl::ReadConcernArgs(LogicalTime(Timestamp(10, 1)), readConcernLevel); TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart), @@ -894,7 +894,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter repl::ReadConcernArgs(repl::OpTime(Timestamp(10, 1), 2), readConcernLevel); TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart), @@ -906,7 +906,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsOrRecoveryToken) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -946,7 +946,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CommitTransactionWithNoParticipantsDoesNotSendCommit) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -967,13 +967,13 @@ TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForSingleParticipantThatIsReadOnly) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1003,13 +1003,13 @@ TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForSingleParticipantThatDidAWrite) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1039,15 +1039,15 @@ TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForMultipleParticipantsThatAreAllReadOnly) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); - txnRouter.processParticipantResponse(shard2, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); @@ -1088,15 +1088,15 @@ TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyToReadOnlyShardsThenWriteShardForMultipleParticipantsOnlyOneDidAWrite) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); - txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); @@ -1139,15 +1139,15 @@ TEST_F(TransactionRouterTestWithDefaultSession, SendCoordinateCommitForMultipleParticipantsMoreThanOneDidAWrite) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); - txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); @@ -1196,13 +1196,13 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); auto future = - launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); }); + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -1225,10 +1225,10 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) { // Sending commit with a recovery token again should cause the router to use the recovery path // again. - txnRouter->beginOrContinueTxn( + txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); - future = launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); }); + future = launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -1262,14 +1262,13 @@ TEST_F(TransactionRouterTestWithDefaultSession, auto txnRouter = TransactionRouter::get(opCtx); // Simulate recovering a commit with a recovery token and no participants. { - txnRouter->beginOrContinueTxn( - opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); auto future = - launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); }); + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -1294,23 +1293,23 @@ TEST_F(TransactionRouterTestWithDefaultSession, // should be sent with the correct participant list. { ++txnNum; - txnRouter->beginOrContinueTxn( + txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse); - txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); - txnRouter->beginOrContinueTxn( + txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); auto future = - launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); }); + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -1352,23 +1351,23 @@ TEST_F(TransactionRouterTestWithDefaultSession, // Run a cross-shard transaction with two-phase commit. The commit should be sent with the // correct participant list. { - txnRouter->beginOrContinueTxn( + txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse); - txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); - txnRouter->beginOrContinueTxn( + txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); auto future = - launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); }); + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -1400,14 +1399,13 @@ TEST_F(TransactionRouterTestWithDefaultSession, { ++txnNum; - txnRouter->beginOrContinueTxn( - opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(shard1); auto future = - launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); }); + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); onCommand([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -1442,10 +1440,10 @@ TEST_F(TransactionRouterTest, CommitWithEmptyRecoveryToken) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); TxnRecoveryToken recoveryToken; - ASSERT_THROWS_CODE(txnRouter->commitTransaction(operationContext(), recoveryToken), + ASSERT_THROWS_CODE(txnRouter.commitTransaction(operationContext(), recoveryToken), AssertionException, ErrorCodes::NoSuchTransaction); } @@ -1463,13 +1461,13 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); TxnRecoveryToken recoveryToken; recoveryToken.setRecoveryShardId(ShardId("magicShard")); auto future = - launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); }); + launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); }); ShardType shardType; shardType.setName(shard1.toString()); @@ -1484,7 +1482,7 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) { TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsResetAtClusterTime) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1534,7 +1532,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotChangeAtClusterTimeAfterStatementThatSelectedIt) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1597,7 +1595,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipants) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1647,7 +1645,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipa TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAfterFirstCommand) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1669,7 +1667,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAft TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreatedAt) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1723,7 +1721,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, AllParticipantsAndCoordinatorClearedOnStaleErrorOnFirstCommand) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1768,7 +1766,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClearedOnStaleError) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1812,7 +1810,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RetriesCannotPickNewAtClusterTimeOnStatementAfterSelected) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); @@ -1866,7 +1864,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, WritesCanOnlyBeRetriedIfFirstOve TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1904,11 +1902,11 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); ASSERT_THROWS_CODE( - txnRouter->abortTransaction(opCtx), DBException, ErrorCodes::NoSuchTransaction); + txnRouter.abortTransaction(opCtx), DBException, ErrorCodes::NoSuchTransaction); } TEST_F(TransactionRouterTest, AbortForSingleParticipant) { @@ -1922,11 +1920,11 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); }); + auto future = launchAsync([&] { return txnRouter.abortTransaction(operationContext()); }); onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -1955,14 +1953,14 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse); - txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); - auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); }); + auto future = launchAsync([&] { return txnRouter.abortTransaction(operationContext()); }); std::map<HostAndPort, boost::optional<bool>> targets = {{hostAndPort1, true}, {hostAndPort2, {}}}; @@ -1998,16 +1996,16 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransa RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard3, {}); - txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse); - txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse); - txnRouter->processParticipantResponse(shard3, kOkReadOnlyFalseResponse); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {}); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard3, kOkReadOnlyFalseResponse); - auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); }); + auto future = launchAsync([&] { return txnRouter.abortTransaction(operationContext()); }); std::map<HostAndPort, boost::optional<bool>> targets = { {hostAndPort1, true}, {hostAndPort2, {}}, {hostAndPort3, {}}}; @@ -2047,16 +2045,16 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard3, {}); - txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse); - txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse); - txnRouter->processParticipantResponse(shard3, kOkReadOnlyFalseResponse); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {}); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard3, kOkReadOnlyFalseResponse); - auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); }); + auto future = launchAsync([&] { return txnRouter.abortTransaction(operationContext()); }); std::map<HostAndPort, boost::optional<bool>> targets = { {hostAndPort1, true}, {hostAndPort2, {}}, {hostAndPort3, {}}}; @@ -2092,7 +2090,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNewParticipants) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2151,11 +2149,11 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); // Should not throw. - txnRouter->implicitlyAbortTransaction(opCtx, kDummyStatus); + txnRouter.implicitlyAbortTransaction(opCtx, kDummyStatus); } TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) { @@ -2169,12 +2167,12 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); auto future = launchAsync( - [&] { return txnRouter->implicitlyAbortTransaction(operationContext(), kDummyStatus); }); + [&] { return txnRouter.implicitlyAbortTransaction(operationContext(), kDummyStatus); }); onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -2202,13 +2200,13 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {}); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); auto future = launchAsync( - [&] { return txnRouter->implicitlyAbortTransaction(operationContext(), kDummyStatus); }); + [&] { return txnRouter.implicitlyAbortTransaction(operationContext(), kDummyStatus); }); std::map<HostAndPort, boost::optional<bool>> targets = {{hostAndPort1, true}, {hostAndPort2, {}}}; @@ -2243,12 +2241,12 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) { RouterOperationContextSession scopedSession(opCtx); auto txnRouter = TransactionRouter::get(opCtx); - txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); - txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {}); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.setDefaultAtClusterTime(operationContext()); + txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); auto future = launchAsync( - [&] { return txnRouter->implicitlyAbortTransaction(operationContext(), kDummyStatus); }); + [&] { return txnRouter.implicitlyAbortTransaction(operationContext(), kDummyStatus); }); onCommandForPoolExecutor([&](const RemoteCommandRequest& request) { ASSERT_EQ(hostAndPort1, request.target); @@ -2270,7 +2268,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotOrStaleVersionErrorsWithoutFailpoint) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2291,7 +2289,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, ContinuingTransactionPlacesItsReadConcernOnOpCtx) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2308,7 +2306,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SubsequentStatementCanSelectAtClusterTimeIfNotSelectedYet) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); @@ -2356,16 +2354,16 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernHasNoAtClu for (auto rcIt : supportedNonSnapshotRCLevels) { repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second); - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart); // No atClusterTime is placed on the router by default. - ASSERT_FALSE(txnRouter.getAtClusterTime()); + ASSERT_FALSE(txnRouter.mustUseAtClusterTime()); // Can't compute and set an atClusterTime. txnRouter.setDefaultAtClusterTime(operationContext()); - ASSERT_FALSE(txnRouter.getAtClusterTime()); + ASSERT_FALSE(txnRouter.mustUseAtClusterTime()); // Can't continue on snapshot errors. ASSERT_FALSE(txnRouter.canContinueOnSnapshotError()); @@ -2378,7 +2376,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, for (auto rcIt : supportedNonSnapshotRCLevels) { repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second); - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2415,7 +2413,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(clusterTime, rcIt.second); - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2436,7 +2434,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernLevelsPres for (auto rcIt : supportedNonSnapshotRCLevels) { repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(opTime, rcIt.second); - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart); @@ -2456,7 +2454,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, AbortBetweenStatementRetriesIgnoresNoSuchTransaction) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); @@ -2486,7 +2484,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, AbortBetweenStatementRetriesUsesIdempotentRetryPolicy) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); @@ -2518,7 +2516,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, AbortBetweenStatementRetriesFailsWithNoSuchTransactionOnUnexpectedErrors) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); @@ -2550,7 +2548,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, "Participant should exist if processing participant response") { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2560,20 +2558,20 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {}); // Simulate response from some participant not in the list. - txnRouter.processParticipantResponse(shard3, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard3, kOkReadOnlyTrueResponse); } TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseDoesNotUpdateParticipantIfResponseStatusIsNotOk) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, BSON("ok" << 0)); + txnRouter.processParticipantResponse(operationContext(), shard1, BSON("ok" << 0)); ASSERT(TransactionRouter::Participant::ReadOnly::kUnset == txnRouter.getParticipant(shard1)->readOnly); } @@ -2582,13 +2580,13 @@ TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseMarksParticipantAsReadOnlyIfResponseSaysReadOnlyTrue) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); const auto participant = txnRouter.getParticipant(shard1); @@ -2596,10 +2594,10 @@ TEST_F(TransactionRouterTestWithDefaultSession, // Further responses with readOnly: true do not change the participant's readOnly field. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); ASSERT(TransactionRouter::Participant::ReadOnly::kReadOnly == participant->readOnly); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); ASSERT(TransactionRouter::Participant::ReadOnly::kReadOnly == participant->readOnly); } @@ -2607,23 +2605,23 @@ TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseMarksParticipantAsNotReadOnlyIfFirstResponseSaysReadOnlyFalse) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); const auto participant = txnRouter.getParticipant(shard1); ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly); // Further responses with readOnly: false do not change the participant's readOnly field. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly); - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly); } @@ -2632,7 +2630,7 @@ TEST_F( ProcessParticipantResponseUpdatesParticipantToReadOnlyFalseIfLaterResponseSaysReadOnlyFalse) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2640,23 +2638,25 @@ TEST_F( txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); // First response says readOnly: true. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); - const auto participant = txnRouter.getParticipant(shard1); + const auto oldParticipant = txnRouter.getParticipant(shard1); - ASSERT(TransactionRouter::Participant::ReadOnly::kReadOnly == participant->readOnly); + ASSERT(TransactionRouter::Participant::ReadOnly::kReadOnly == oldParticipant->readOnly); // Later response says readOnly: false. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); - ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly); + const auto newParticipant = txnRouter.getParticipant(shard1); + + ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == newParticipant->readOnly); } TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseThrowsIfParticipantClaimsToChangeFromReadOnlyFalseToReadOnlyTrue) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2664,23 +2664,24 @@ TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); // First response says readOnly: false. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); const auto participant = txnRouter.getParticipant(shard1); ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly); // Later response says readOnly: true. - ASSERT_THROWS_CODE(txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse), - AssertionException, - 51113); + ASSERT_THROWS_CODE( + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse), + AssertionException, + 51113); } TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseThrowsIfParticipantReturnsErrorThenSuccessOnLaterStatement) { TxnNumber txnNum{3}; - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2688,7 +2689,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {}); // First response is an error. - txnRouter.processParticipantResponse(shard1, BSON("ok" << 0)); + txnRouter.processParticipantResponse(operationContext(), shard1, BSON("ok" << 0)); const auto participant = txnRouter.getParticipant(shard1); ASSERT(TransactionRouter::Participant::ReadOnly::kUnset == participant->readOnly); @@ -2703,12 +2704,14 @@ TEST_F(TransactionRouterTestWithDefaultSession, operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue); // The router should throw regardless of whether the response says readOnly true or false. - ASSERT_THROWS_CODE(txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse), - AssertionException, - 51112); - ASSERT_THROWS_CODE(txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse), - AssertionException, - 51112); + ASSERT_THROWS_CODE( + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse), + AssertionException, + 51112); + ASSERT_THROWS_CODE( + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse), + AssertionException, + 51112); } TEST_F(TransactionRouterTestWithDefaultSession, @@ -2716,7 +2719,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto opCtx = operationContext(); - auto& txnRouter(*TransactionRouter::get(opCtx)); + auto txnRouter = TransactionRouter::get(opCtx); txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(opCtx); @@ -2732,7 +2735,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, future.default_timed_get(); // The participant's response metadata should not be processed since abort has been initiated. - txnRouter.processParticipantResponse(shard1, BSON("ok" << 0)); + txnRouter.processParticipantResponse(operationContext(), shard1, BSON("ok" << 0)); ASSERT(TransactionRouter::Participant::ReadOnly::kUnset == txnRouter.getParticipant(shard1)->readOnly); } @@ -2742,7 +2745,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto opCtx = operationContext(); - auto& txnRouter(*TransactionRouter::get(opCtx)); + auto txnRouter = TransactionRouter::get(opCtx); txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(opCtx); @@ -2754,7 +2757,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, future.default_timed_get(); // The participant's response metadata should not be processed since abort has been initiated. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); ASSERT(TransactionRouter::Participant::ReadOnly::kUnset == txnRouter.getParticipant(shard1)->readOnly); } @@ -2764,14 +2767,14 @@ TEST_F(TransactionRouterTestWithDefaultSession, TxnNumber txnNum{3}; auto opCtx = operationContext(); - auto& txnRouter(*TransactionRouter::get(opCtx)); + auto txnRouter = TransactionRouter::get(opCtx); txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(opCtx); txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {}); // Process !readonly response to set participant state. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == txnRouter.getParticipant(shard1)->readOnly); @@ -2784,7 +2787,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, future.default_timed_get(); // Processing readonly response should not throw since commit has been initiated. - txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); } // Begins a transaction with snapshot level read concern and sets a default cluster time. @@ -2800,7 +2803,7 @@ protected: void setUp() override { TransactionRouterTestWithDefaultSession::setUp(); - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -2808,7 +2811,7 @@ protected: }; TEST_F(TransactionRouterTestWithDefaultSessionAndStartedSnapshot, AddAtClusterTimeNormal) { - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, BSON("aggregate" @@ -2824,7 +2827,7 @@ TEST_F(TransactionRouterTestWithDefaultSessionAndStartedSnapshot, AddingAtClusterTimeOverwritesExistingAfterClusterTime) { const Timestamp existingAfterClusterTime(1, 1); - auto& txnRouter(*TransactionRouter::get(operationContext())); + auto txnRouter = TransactionRouter::get(operationContext()); auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, BSON("aggregate" @@ -2857,8 +2860,8 @@ protected: return dynamic_cast<TickSourceMock<Microseconds>*>(getServiceContext()->getTickSource()); } - TransactionRouter& txnRouter() { - return *TransactionRouter::get(operationContext()); + TransactionRouter::Router txnRouter() { + return TransactionRouter::get(operationContext()); } void beginTxnWithDefaultTxnNumber() { @@ -2909,7 +2912,8 @@ protected: void explicitAbortInProgress() { txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse( + operationContext(), shard1, kOkReadOnlyFalseResponse); startCapturingLogMessages(); auto future = launchAsync([&] { txnRouter().abortTransaction(operationContext()); }); @@ -2920,7 +2924,8 @@ protected: void implicitAbortInProgress() { txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse( + operationContext(), shard1, kOkReadOnlyFalseResponse); startCapturingLogMessages(); auto future = launchAsync( @@ -2932,7 +2937,8 @@ protected: void runCommit(StatusWith<BSONObj> swRes, bool expectRetries = false) { txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse( + operationContext(), shard1, kOkReadOnlyFalseResponse); startCapturingLogMessages(); auto future = launchAsync([&] { @@ -2987,7 +2993,7 @@ protected: void runSingleShardCommit() { txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); startCapturingLogMessages(); auto future = launchAsync( @@ -2999,9 +3005,9 @@ protected: void runReadOnlyCommit() { txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter().processParticipantResponse(shard2, kOkReadOnlyTrueResponse); + txnRouter().processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse); startCapturingLogMessages(); auto future = launchAsync( @@ -3014,9 +3020,10 @@ protected: void runSingleWriteShardCommit() { txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyTrueResponse); + txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter().processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse( + operationContext(), shard2, kOkReadOnlyFalseResponse); startCapturingLogMessages(); auto future = launchAsync( @@ -3029,9 +3036,11 @@ protected: void runTwoPhaseCommit() { txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse( + operationContext(), shard1, kOkReadOnlyFalseResponse); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter().processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse( + operationContext(), shard2, kOkReadOnlyFalseResponse); startCapturingLogMessages(); auto future = launchAsync( @@ -3069,7 +3078,8 @@ protected: auto beginAndPauseCommit() { // Commit after targeting one shard so the commit has to do work and can be paused. txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse( + operationContext(), shard1, kOkReadOnlyFalseResponse); auto future = launchAsync( [&] { txnRouter().commitTransaction(operationContext(), kDummyRecoveryToken); }); @@ -3853,15 +3863,15 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalRequestsTargeted) { // Increases each time transaction fields are attached. txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalRequestsTargeted()); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalRequestsTargeted()); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter().processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); ASSERT_EQUALS(3L, routerTxnMetrics()->getTotalRequestsTargeted()); } @@ -3901,11 +3911,11 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalParticipantsAtCommit) { ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalParticipantsAtCommit()); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); - txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse); ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalParticipantsAtCommit()); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {}); - txnRouter().processParticipantResponse(shard2, kOkReadOnlyFalseResponse); + txnRouter().processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse); ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalParticipantsAtCommit()); // Increases after commit begins, before it ends. diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp index aaaaf2eb7fe..428bd7f448d 100644 --- a/src/mongo/s/write_ops/batch_write_exec_test.cpp +++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp @@ -640,9 +640,9 @@ public: _scopedSession.emplace(operationContext()); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter->beginOrContinueTxn( + txnRouter.beginOrContinueTxn( operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); - txnRouter->setDefaultAtClusterTime(operationContext()); + txnRouter.setDefaultAtClusterTime(operationContext()); } void tearDown() override { diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp index 7c877882186..c517f92a76f 100644 --- a/src/mongo/s/write_ops/batch_write_op.cpp +++ b/src/mongo/s/write_ops/batch_write_op.cpp @@ -248,7 +248,7 @@ BatchWriteOp::BatchWriteOp(OperationContext* opCtx, const BatchedCommandRequest& : _opCtx(opCtx), _clientRequest(clientRequest), _batchTxnNum(_opCtx->getTxnNumber()), - _inTransaction(TransactionRouter::get(opCtx) != nullptr) { + _inTransaction(bool(TransactionRouter::get(opCtx))) { _writeOps.reserve(_clientRequest.sizeWriteOps()); for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) { diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp index 85a289bf6d4..f41f4bfb322 100644 --- a/src/mongo/s/write_ops/batch_write_op_test.cpp +++ b/src/mongo/s/write_ops/batch_write_op_test.cpp @@ -1562,7 +1562,7 @@ public: _scopedSession.emplace(operationContext()); auto txnRouter = TransactionRouter::get(operationContext()); - txnRouter->beginOrContinueTxn( + txnRouter.beginOrContinueTxn( operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart); } diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp index 1d647c0ce30..8636c352128 100644 --- a/src/mongo/s/write_ops/write_op.cpp +++ b/src/mongo/s/write_ops/write_op.cpp @@ -76,7 +76,7 @@ Status WriteOp::targetWrites(OperationContext* opCtx, // // NOTE: Index inserts are currently specially targeted only at the current collection to avoid // creating collections everywhere. - const bool inTransaction = TransactionRouter::get(opCtx) != nullptr; + const bool inTransaction = bool(TransactionRouter::get(opCtx)); if (swEndpoints.isOK() && swEndpoints.getValue().size() > 1u && !inTransaction) { swEndpoints = targeter.targetAllShards(opCtx); } diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp index 21499683120..a90aa11f187 100644 --- a/src/mongo/s/write_ops/write_op_test.cpp +++ b/src/mongo/s/write_ops/write_op_test.cpp @@ -337,7 +337,7 @@ TEST_F(WriteOpTransactionTests, TargetMultiAllShardsAndErrorSingleChildOp) { opCtx()->setTxnNumber(kTxnNumber); auto txnRouter = TransactionRouter::get(opCtx()); - txnRouter->beginOrContinueTxn( + txnRouter.beginOrContinueTxn( opCtx(), kTxnNumber, TransactionRouter::TransactionActions::kStart); // Do multi-target write op |