summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorBlake Oler <blake.oler@mongodb.com>2019-06-20 11:50:28 -0400
committerBlake Oler <blake.oler@mongodb.com>2019-06-27 17:31:09 -0400
commitc11b97788fcc91288deac647ddcc11625607d256 (patch)
tree3c95e51b7ed04c7a7aa94e6ef56a30ec293e0d9e /src/mongo
parentd960519275aba7e6611294903cd2b5156710a73b (diff)
downloadmongo-c11b97788fcc91288deac647ddcc11625607d256.tar.gz
SERVER-41676 Convert TransactionRouter to use observer pattern to synchronize internal data
with external observers
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/s/cluster_commands_helpers.cpp7
-rw-r--r--src/mongo/s/commands/cluster_abort_transaction_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_commit_transaction_cmd.cpp4
-rw-r--r--src/mongo/s/commands/cluster_distinct_cmd.cpp2
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp8
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp11
-rw-r--r--src/mongo/s/commands/document_shard_key_update_util.cpp13
-rw-r--r--src/mongo/s/commands/document_shard_key_update_util.h8
-rw-r--r--src/mongo/s/commands/strategy.cpp77
-rw-r--r--src/mongo/s/multi_statement_transaction_requests_sender.cpp5
-rw-r--r--src/mongo/s/query/cluster_aggregate.cpp2
-rw-r--r--src/mongo/s/query/cluster_find.cpp6
-rw-r--r--src/mongo/s/transaction_router.cpp560
-rw-r--r--src/mongo/s/transaction_router.h671
-rw-r--r--src/mongo/s/transaction_router_test.cpp424
-rw-r--r--src/mongo/s/write_ops/batch_write_exec_test.cpp4
-rw-r--r--src/mongo/s/write_ops/batch_write_op.cpp2
-rw-r--r--src/mongo/s/write_ops/batch_write_op_test.cpp2
-rw-r--r--src/mongo/s/write_ops/write_op.cpp2
-rw-r--r--src/mongo/s/write_ops/write_op_test.cpp2
20 files changed, 1007 insertions, 805 deletions
diff --git a/src/mongo/s/cluster_commands_helpers.cpp b/src/mongo/s/cluster_commands_helpers.cpp
index 82191248f6f..1cfe08eed3b 100644
--- a/src/mongo/s/cluster_commands_helpers.cpp
+++ b/src/mongo/s/cluster_commands_helpers.cpp
@@ -561,13 +561,12 @@ StatusWith<CachedCollectionRoutingInfo> getCollectionRoutingInfoForTxnCmd(
// Return the latest routing table if not running in a transaction with snapshot level read
// concern.
auto txnRouter = TransactionRouter::get(opCtx);
- if (!txnRouter || !txnRouter->getAtClusterTime()) {
+ if (!txnRouter || !txnRouter.mustUseAtClusterTime()) {
return catalogCache->getCollectionRoutingInfo(opCtx, nss);
}
- auto atClusterTime = txnRouter->getAtClusterTime();
- return catalogCache->getCollectionRoutingInfoAt(
- opCtx, nss, atClusterTime->getTime().asTimestamp());
+ auto atClusterTime = txnRouter.getSelectedAtClusterTime();
+ return catalogCache->getCollectionRoutingInfoAt(opCtx, nss, atClusterTime.asTimestamp());
}
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp b/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp
index 7defb877a0b..141b1db98f0 100644
--- a/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp
+++ b/src/mongo/s/commands/cluster_abort_transaction_cmd.cpp
@@ -76,7 +76,7 @@ public:
"abortTransaction can only be run within a session",
txnRouter);
- auto abortRes = txnRouter->abortTransaction(opCtx);
+ auto abortRes = txnRouter.abortTransaction(opCtx);
CommandHelpers::filterCommandReplyForPassthrough(abortRes, &result);
return true;
}
diff --git a/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp b/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp
index 38d6061b61b..6e4430d4127 100644
--- a/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp
+++ b/src/mongo/s/commands/cluster_commit_transaction_cmd.cpp
@@ -75,11 +75,11 @@ public:
auto txnRouter = TransactionRouter::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
"commitTransaction can only be run within a session",
- txnRouter != nullptr);
+ txnRouter);
const auto commitCmd =
CommitTransaction::parse(IDLParserErrorContext("commit cmd"), cmdObj);
- auto commitRes = txnRouter->commitTransaction(opCtx, commitCmd.getRecoveryToken());
+ auto commitRes = txnRouter.commitTransaction(opCtx, commitCmd.getRecoveryToken());
CommandHelpers::filterCommandReplyForPassthrough(commitRes, &result);
return true;
}
diff --git a/src/mongo/s/commands/cluster_distinct_cmd.cpp b/src/mongo/s/commands/cluster_distinct_cmd.cpp
index eb0d514c76c..ff8049b2187 100644
--- a/src/mongo/s/commands/cluster_distinct_cmd.cpp
+++ b/src/mongo/s/commands/cluster_distinct_cmd.cpp
@@ -204,7 +204,7 @@ public:
auto resolvedAggCmd = resolvedAggRequest.serializeToCommandObj().toBson();
if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter->onViewResolutionError(opCtx, nss);
+ txnRouter.onViewResolutionError(opCtx, nss);
}
BSONObj aggResult = CommandHelpers::runCommandDirectly(
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 8d71f9c08d8..1f70c6080a0 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -311,12 +311,10 @@ private:
// transaction. We call _runCommand recursively, and this second time through
// since it will be run as a transaction it will take the other code path to
// updateShardKeyValueOnWouldChangeOwningShardError.
- auto txnRouterForShardKeyChange =
- documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
+ documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
_runCommand(opCtx, shardId, shardVersion, nss, cmdObj, result);
auto commitResponse =
- documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(
- opCtx, txnRouterForShardKeyChange);
+ documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
uassertStatusOK(getStatusFromCommandResult(commitResponse));
if (auto wcErrorElem = commitResponse["writeConcernError"]) {
@@ -331,7 +329,7 @@ private:
auto txnRouterForAbort = TransactionRouter::get(opCtx);
if (txnRouterForAbort)
- txnRouterForAbort->implicitlyAbortTransaction(opCtx, e.toStatus());
+ txnRouterForAbort.implicitlyAbortTransaction(opCtx, e.toStatus());
throw;
}
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index e3d3df7488f..a408e03f97a 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -215,8 +215,7 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx,
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
readConcernArgs = repl::ReadConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
- auto txnRouterForShardKeyChange =
- documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
+ documentShardKeyUpdateUtil::startTransactionForShardKeyUpdate(opCtx);
// Clear the error details from the response object before sending the write again
response->unsetErrDetails();
ClusterWriter::write(opCtx, request, &stats, response);
@@ -240,8 +239,8 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx,
}
// Commit the transaction
- auto commitResponse = documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(
- opCtx, txnRouterForShardKeyChange);
+ auto commitResponse =
+ documentShardKeyUpdateUtil::commitShardKeyUpdateTransaction(opCtx);
uassertStatusOK(getStatusFromCommandResult(commitResponse));
@@ -268,7 +267,7 @@ bool handleWouldChangeOwningShardError(OperationContext* opCtx,
auto txnRouterForAbort = TransactionRouter::get(opCtx);
if (txnRouterForAbort)
- txnRouterForAbort->implicitlyAbortTransaction(opCtx, status);
+ txnRouterForAbort.implicitlyAbortTransaction(opCtx, status);
return false;
}
@@ -501,7 +500,7 @@ private:
if (auto txnRouter = TransactionRouter::get(opCtx)) {
auto writeCmdStatus = response.toStatus();
if (!writeCmdStatus.isOK()) {
- txnRouter->implicitlyAbortTransaction(opCtx, writeCmdStatus);
+ txnRouter.implicitlyAbortTransaction(opCtx, writeCmdStatus);
}
}
diff --git a/src/mongo/s/commands/document_shard_key_update_util.cpp b/src/mongo/s/commands/document_shard_key_update_util.cpp
index d10b9294110..119193d9ee5 100644
--- a/src/mongo/s/commands/document_shard_key_update_util.cpp
+++ b/src/mongo/s/commands/document_shard_key_update_util.cpp
@@ -142,20 +142,21 @@ bool updateShardKeyForDocument(OperationContext* opCtx,
opCtx, deleteCmdObj, insertCmdObj, nss.db(), documentKeyChangeInfo.getShouldUpsert());
}
-TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx) {
+void startTransactionForShardKeyUpdate(OperationContext* opCtx) {
auto txnRouter = TransactionRouter::get(opCtx);
invariant(txnRouter);
auto txnNumber = opCtx->getTxnNumber();
invariant(txnNumber);
- txnRouter->beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart);
-
- return txnRouter;
+ txnRouter.beginOrContinueTxn(opCtx, *txnNumber, TransactionRouter::TransactionActions::kStart);
}
-BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter) {
- return txnRouter->commitTransaction(opCtx, boost::none);
+BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx) {
+ auto txnRouter = TransactionRouter::get(opCtx);
+ invariant(txnRouter);
+
+ return txnRouter.commitTransaction(opCtx, boost::none);
}
BSONObj constructShardKeyDeleteCmdObj(const NamespaceString& nss,
diff --git a/src/mongo/s/commands/document_shard_key_update_util.h b/src/mongo/s/commands/document_shard_key_update_util.h
index 8e3c4ac8ad2..a36579572cb 100644
--- a/src/mongo/s/commands/document_shard_key_update_util.h
+++ b/src/mongo/s/commands/document_shard_key_update_util.h
@@ -76,16 +76,16 @@ bool updateShardKeyForDocument(OperationContext* opCtx,
int stmtId);
/**
- * Gets the transaction router and starts a transaction on this session. This method is called when
- * WouldChangeOwningShard is thrown for a write that is not in a transaction already.
+ * Starts a transaction on this session. This method is called when WouldChangeOwningShard is thrown
+ * for a write that is not in a transaction already.
*/
-TransactionRouter* startTransactionForShardKeyUpdate(OperationContext* opCtx);
+void startTransactionForShardKeyUpdate(OperationContext* opCtx);
/**
* Commits the transaction on this session. This method is called to commit the transaction started
* when WouldChangeOwningShard is thrown for a write that is not in a transaction already.
*/
-BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx, TransactionRouter* txnRouter);
+BSONObj commitShardKeyUpdateTransaction(OperationContext* opCtx);
/**
* Creates the BSONObj that will be used to delete the pre-image document. Will also attach
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index fcdf8a8299d..f50597ba972 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -158,10 +158,12 @@ void appendRequiredFieldsToResponse(OperationContext* opCtx, BSONObjBuilder* res
*/
void invokeInTransactionRouter(OperationContext* opCtx,
CommandInvocation* invocation,
- TransactionRouter* txnRouter,
rpc::ReplyBuilderInterface* result) {
+ auto txnRouter = TransactionRouter::get(opCtx);
+ invariant(txnRouter);
+
// No-op if the transaction is not running with snapshot read concern.
- txnRouter->setDefaultAtClusterTime(opCtx);
+ txnRouter.setDefaultAtClusterTime(opCtx);
try {
invocation->run(opCtx, result);
@@ -173,7 +175,7 @@ void invokeInTransactionRouter(OperationContext* opCtx,
throw;
}
- txnRouter->implicitlyAbortTransaction(opCtx, e.toStatus());
+ txnRouter.implicitlyAbortTransaction(opCtx, e.toStatus());
throw;
}
}
@@ -181,12 +183,12 @@ void invokeInTransactionRouter(OperationContext* opCtx,
/**
* Adds info from the active transaction and the given reason as context to the active exception.
*/
-void addContextForTransactionAbortingError(TransactionRouter* txnRouter,
+void addContextForTransactionAbortingError(StringData txnIdAsString,
+ StmtId latestStmtId,
DBException& ex,
StringData reason) {
- ex.addContext(str::stream() << "Transaction " << txnRouter->txnIdToString()
- << " was aborted on statement "
- << txnRouter->getLatestStmtId()
+ ex.addContext(str::stream() << "Transaction " << txnIdAsString << " was aborted on statement "
+ << latestStmtId
<< " due to: "
<< reason);
}
@@ -280,7 +282,7 @@ void execCommandClient(OperationContext* opCtx,
auto txnRouter = TransactionRouter::get(opCtx);
if (!supportsWriteConcern) {
if (txnRouter) {
- invokeInTransactionRouter(opCtx, invocation, txnRouter, result);
+ invokeInTransactionRouter(opCtx, invocation, result);
} else {
invocation->run(opCtx, result);
}
@@ -291,7 +293,7 @@ void execCommandClient(OperationContext* opCtx,
opCtx->setWriteConcern(wcResult);
if (txnRouter) {
- invokeInTransactionRouter(opCtx, invocation, txnRouter, result);
+ invokeInTransactionRouter(opCtx, invocation, result);
} else {
invocation->run(opCtx, result);
}
@@ -314,8 +316,8 @@ void execCommandClient(OperationContext* opCtx,
c->incrementCommandsFailed();
if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter->implicitlyAbortTransaction(opCtx,
- getStatusFromCommandResult(body.asTempObj()));
+ txnRouter.implicitlyAbortTransaction(opCtx,
+ getStatusFromCommandResult(body.asTempObj()));
}
}
}
@@ -430,7 +432,7 @@ void runCommand(OperationContext* opCtx,
return TransactionRouter::TransactionActions::kContinue;
})();
- txnRouter->beginOrContinueTxn(opCtx, *txnNumber, transactionAction);
+ txnRouter.beginOrContinueTxn(opCtx, *txnNumber, transactionAction);
}
for (int tries = 0;; ++tries) {
@@ -451,7 +453,7 @@ void runCommand(OperationContext* opCtx,
auto responseBuilder = replyBuilder->getBodyBuilder();
if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter->appendRecoveryToken(&responseBuilder);
+ txnRouter.appendRecoveryToken(&responseBuilder);
}
return;
@@ -493,21 +495,27 @@ void runCommand(OperationContext* opCtx,
// error cannot be retried on.
if (auto txnRouter = TransactionRouter::get(opCtx)) {
auto abortGuard = makeGuard(
- [&] { txnRouter->implicitlyAbortTransaction(opCtx, ex.toStatus()); });
+ [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter, ex, "exhausted retries");
+ addContextForTransactionAbortingError(txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ ex,
+ "exhausted retries");
throw;
}
- if (!txnRouter->canContinueOnStaleShardOrDbError(commandName)) {
+ if (!txnRouter.canContinueOnStaleShardOrDbError(commandName)) {
addContextForTransactionAbortingError(
- txnRouter, ex, "an error from cluster data placement change");
+ txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ ex,
+ "an error from cluster data placement change");
throw;
}
// The error is retryable, so update transaction state before retrying.
- txnRouter->onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
+ txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
abortGuard.dismiss();
continue;
@@ -526,21 +534,27 @@ void runCommand(OperationContext* opCtx,
// error cannot be retried on.
if (auto txnRouter = TransactionRouter::get(opCtx)) {
auto abortGuard = makeGuard(
- [&] { txnRouter->implicitlyAbortTransaction(opCtx, ex.toStatus()); });
+ [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter, ex, "exhausted retries");
+ addContextForTransactionAbortingError(txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ ex,
+ "exhausted retries");
throw;
}
- if (!txnRouter->canContinueOnStaleShardOrDbError(commandName)) {
+ if (!txnRouter.canContinueOnStaleShardOrDbError(commandName)) {
addContextForTransactionAbortingError(
- txnRouter, ex, "an error from cluster data placement change");
+ txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ ex,
+ "an error from cluster data placement change");
throw;
}
// The error is retryable, so update transaction state before retrying.
- txnRouter->onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
+ txnRouter.onStaleShardOrDbError(opCtx, commandName, ex.toStatus());
abortGuard.dismiss();
continue;
@@ -557,21 +571,26 @@ void runCommand(OperationContext* opCtx,
// error cannot be retried on.
if (auto txnRouter = TransactionRouter::get(opCtx)) {
auto abortGuard = makeGuard(
- [&] { txnRouter->implicitlyAbortTransaction(opCtx, ex.toStatus()); });
+ [&] { txnRouter.implicitlyAbortTransaction(opCtx, ex.toStatus()); });
if (!canRetry) {
- addContextForTransactionAbortingError(txnRouter, ex, "exhausted retries");
+ addContextForTransactionAbortingError(txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ ex,
+ "exhausted retries");
throw;
}
- if (!txnRouter->canContinueOnSnapshotError()) {
- addContextForTransactionAbortingError(
- txnRouter, ex, "a non-retryable snapshot error");
+ if (!txnRouter.canContinueOnSnapshotError()) {
+ addContextForTransactionAbortingError(txnRouter.txnIdToString(),
+ txnRouter.getLatestStmtId(),
+ ex,
+ "a non-retryable snapshot error");
throw;
}
// The error is retryable, so update transaction state before retrying.
- txnRouter->onSnapshotError(opCtx, ex.toStatus());
+ txnRouter.onSnapshotError(opCtx, ex.toStatus());
abortGuard.dismiss();
continue;
diff --git a/src/mongo/s/multi_statement_transaction_requests_sender.cpp b/src/mongo/s/multi_statement_transaction_requests_sender.cpp
index e376c4dd551..9671914d315 100644
--- a/src/mongo/s/multi_statement_transaction_requests_sender.cpp
+++ b/src/mongo/s/multi_statement_transaction_requests_sender.cpp
@@ -50,7 +50,7 @@ std::vector<AsyncRequestsSender::Request> attachTxnDetails(
for (auto request : requests) {
newRequests.emplace_back(
request.shardId,
- txnRouter->attachTxnFieldsIfNeeded(opCtx, request.shardId, request.cmdObj));
+ txnRouter.attachTxnFieldsIfNeeded(opCtx, request.shardId, request.cmdObj));
}
return newRequests;
@@ -66,7 +66,8 @@ void processReplyMetadata(OperationContext* opCtx, const AsyncRequestsSender::Re
return;
}
- txnRouter->processParticipantResponse(response.shardId, response.swResponse.getValue().data);
+ txnRouter.processParticipantResponse(
+ opCtx, response.shardId, response.swResponse.getValue().data);
}
} // unnamed namespace
diff --git a/src/mongo/s/query/cluster_aggregate.cpp b/src/mongo/s/query/cluster_aggregate.cpp
index 943fe8b49bc..76a5d1003fd 100644
--- a/src/mongo/s/query/cluster_aggregate.cpp
+++ b/src/mongo/s/query/cluster_aggregate.cpp
@@ -952,7 +952,7 @@ Status ClusterAggregate::retryOnViewError(OperationContext* opCtx,
result->resetToEmpty();
if (auto txnRouter = TransactionRouter::get(opCtx)) {
- txnRouter->onViewResolutionError(opCtx, requestedNss);
+ txnRouter.onViewResolutionError(opCtx, requestedNss);
}
// We pass both the underlying collection namespace and the view namespace here. The
diff --git a/src/mongo/s/query/cluster_find.cpp b/src/mongo/s/query/cluster_find.cpp
index 6e2b7fe7098..efff0cfd072 100644
--- a/src/mongo/s/query/cluster_find.cpp
+++ b/src/mongo/s/query/cluster_find.cpp
@@ -452,15 +452,15 @@ CursorId ClusterFind::runQuery(OperationContext* opCtx,
catalogCache->onStaleShardVersion(std::move(routingInfo));
if (auto txnRouter = TransactionRouter::get(opCtx)) {
- if (!txnRouter->canContinueOnStaleShardOrDbError(kFindCmdName)) {
+ if (!txnRouter.canContinueOnStaleShardOrDbError(kFindCmdName)) {
throw;
}
// Reset the default global read timestamp so the retry's routing table reflects the
// chunk placement after the refresh (no-op if the transaction is not running with
// snapshot read concern).
- txnRouter->onStaleShardOrDbError(opCtx, kFindCmdName, ex.toStatus());
- txnRouter->setDefaultAtClusterTime(opCtx);
+ txnRouter.onStaleShardOrDbError(opCtx, kFindCmdName, ex.toStatus());
+ txnRouter.setDefaultAtClusterTime(opCtx);
}
}
}
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 4987c62c0a0..1c5c9946ad6 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -235,14 +235,52 @@ BSONObj sendCommitDirectlyToShards(OperationContext* opCtx, const std::vector<Sh
return lastResult;
}
+// Helper to convert the CommitType enum into a human readable string for diagnostics.
+std::string commitTypeToString(TransactionRouter::CommitType state) {
+ switch (state) {
+ case TransactionRouter::CommitType::kNotInitiated:
+ return "notInitiated";
+ case TransactionRouter::CommitType::kNoShards:
+ return "noShards";
+ case TransactionRouter::CommitType::kSingleShard:
+ return "singleShard";
+ case TransactionRouter::CommitType::kSingleWriteShard:
+ return "singleWriteShard";
+ case TransactionRouter::CommitType::kReadOnly:
+ return "readOnly";
+ case TransactionRouter::CommitType::kTwoPhaseCommit:
+ return "twoPhaseCommit";
+ case TransactionRouter::CommitType::kRecoverWithToken:
+ return "recoverWithToken";
+ }
+ MONGO_UNREACHABLE;
+}
+
} // unnamed namespace
+TransactionRouter::TransactionRouter() = default;
+
+TransactionRouter::~TransactionRouter() = default;
+
+TransactionRouter::Observer::Observer(const ObservableSession& osession)
+ : Observer(&getTransactionRouter(osession.get())) {}
+
+TransactionRouter::Router::Router(OperationContext* opCtx)
+ : Observer([opCtx]() -> TransactionRouter* {
+ if (auto session = OperationContextSession::get(opCtx)) {
+ return &getTransactionRouter(session);
+ }
+ return nullptr;
+ }()) {}
+
TransactionRouter::Participant::Participant(bool inIsCoordinator,
StmtId inStmtIdCreatedAt,
+ ReadOnly inReadOnly,
SharedTransactionOptions inSharedOptions)
: isCoordinator(inIsCoordinator),
- stmtIdCreatedAt(inStmtIdCreatedAt),
- sharedOptions(std::move(inSharedOptions)) {}
+ readOnly(inReadOnly),
+ sharedOptions(std::move(inSharedOptions)),
+ stmtIdCreatedAt(inStmtIdCreatedAt) {}
BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(
BSONObj cmd, bool isFirstStatementInThisParticipant) const {
@@ -302,12 +340,13 @@ BSONObj TransactionRouter::Participant::attachTxnFieldsIfNeeded(
return newCmd.obj();
}
-void TransactionRouter::processParticipantResponse(const ShardId& shardId,
- const BSONObj& responseObj) {
+void TransactionRouter::Router::processParticipantResponse(OperationContext* opCtx,
+ const ShardId& shardId,
+ const BSONObj& responseObj) {
auto participant = getParticipant(shardId);
invariant(participant, "Participant should exist if processing participant response");
- if (_terminationInitiated) {
+ if (p().terminationInitiated) {
// Do not process the transaction metadata after commit or abort have been initiated,
// since a participant's state is partially reset on commit and abort.
return;
@@ -318,7 +357,7 @@ void TransactionRouter::processParticipantResponse(const ShardId& shardId,
return;
}
- if (participant->stmtIdCreatedAt != _latestStmtId) {
+ if (participant->stmtIdCreatedAt != p().latestStmtId) {
uassert(
51112,
str::stream() << "readOnly field for participant " << shardId
@@ -332,7 +371,7 @@ void TransactionRouter::processParticipantResponse(const ShardId& shardId,
if (txnResponseMetadata.getReadOnly()) {
if (participant->readOnly == Participant::ReadOnly::kUnset) {
LOG(3) << txnIdToString() << " Marking " << shardId << " as read-only";
- participant->readOnly = Participant::ReadOnly::kReadOnly;
+ _setReadOnlyForParticipant(opCtx, shardId, Participant::ReadOnly::kReadOnly);
return;
}
@@ -348,11 +387,12 @@ void TransactionRouter::processParticipantResponse(const ShardId& shardId,
if (participant->readOnly != Participant::ReadOnly::kNotReadOnly) {
LOG(3) << txnIdToString() << " Marking " << shardId << " as having done a write";
- participant->readOnly = Participant::ReadOnly::kNotReadOnly;
- if (!_recoveryShardId) {
+ _setReadOnlyForParticipant(opCtx, shardId, Participant::ReadOnly::kNotReadOnly);
+
+ if (!p().recoveryShardId) {
LOG(3) << txnIdToString() << " Choosing " << shardId << " as recovery shard";
- _recoveryShardId = shardId;
+ p().recoveryShardId = shardId;
}
}
}
@@ -373,39 +413,26 @@ bool TransactionRouter::AtClusterTime::canChange(StmtId currentStmtId) const {
return !_stmtIdSelectedAt || *_stmtIdSelectedAt == currentStmtId;
}
-TransactionRouter* TransactionRouter::get(OperationContext* opCtx) {
- const auto session = OperationContextSession::get(opCtx);
- if (session) {
- return &getTransactionRouter(session);
- }
-
- return nullptr;
+bool TransactionRouter::Router::mustUseAtClusterTime() const {
+ return o().atClusterTime.is_initialized();
}
-TransactionRouter* get(const ObservableSession& osession) {
- return &getTransactionRouter(osession.get());
+LogicalTime TransactionRouter::Router::getSelectedAtClusterTime() const {
+ invariant(o().atClusterTime);
+ return o().atClusterTime->getTime();
}
-TransactionRouter::TransactionRouter() = default;
-
-TransactionRouter::~TransactionRouter() = default;
-
-const boost::optional<TransactionRouter::AtClusterTime>& TransactionRouter::getAtClusterTime()
- const {
- return _atClusterTime;
+const boost::optional<ShardId>& TransactionRouter::Router::getCoordinatorId() const {
+ return o().coordinatorId;
}
-const boost::optional<ShardId>& TransactionRouter::getCoordinatorId() const {
- return _coordinatorId;
+const boost::optional<ShardId>& TransactionRouter::Router::getRecoveryShardId() const {
+ return p().recoveryShardId;
}
-const boost::optional<ShardId>& TransactionRouter::getRecoveryShardId() const {
- return _recoveryShardId;
-}
-
-BSONObj TransactionRouter::attachTxnFieldsIfNeeded(OperationContext* opCtx,
- const ShardId& shardId,
- const BSONObj& cmdObj) {
+BSONObj TransactionRouter::Router::attachTxnFieldsIfNeeded(OperationContext* opCtx,
+ const ShardId& shardId,
+ const BSONObj& cmdObj) {
RouterTransactionsMetrics::get(opCtx)->incrementTotalRequestsTargeted();
if (auto txnPart = getParticipant(shardId)) {
LOG(4) << txnIdToString()
@@ -413,66 +440,93 @@ BSONObj TransactionRouter::attachTxnFieldsIfNeeded(OperationContext* opCtx,
return txnPart->attachTxnFieldsIfNeeded(cmdObj, false);
}
- auto txnPart = _createParticipant(shardId);
+ auto txnPart = _createParticipant(opCtx, shardId);
LOG(4) << txnIdToString() << " Sending transaction fields to new participant: " << shardId;
- if (!_isRecoveringCommit) {
+ if (!p().isRecoveringCommit) {
// Don't update participant stats during recovery since the participant list isn't known.
RouterTransactionsMetrics::get(opCtx)->incrementTotalContactedParticipants();
}
+
return txnPart.attachTxnFieldsIfNeeded(cmdObj, true);
}
-void TransactionRouter::_verifyParticipantAtClusterTime(const Participant& participant) {
+void TransactionRouter::Router::_verifyParticipantAtClusterTime(const Participant& participant) {
const auto& participantAtClusterTime = participant.sharedOptions.atClusterTime;
invariant(participantAtClusterTime);
- invariant(*participantAtClusterTime == _atClusterTime->getTime());
+ invariant(*participantAtClusterTime == o().atClusterTime->getTime());
}
-TransactionRouter::Participant* TransactionRouter::getParticipant(const ShardId& shard) {
- const auto iter = _participants.find(shard.toString());
- if (iter == _participants.end())
+const TransactionRouter::Participant* TransactionRouter::Router::getParticipant(
+ const ShardId& shard) {
+ const auto iter = o().participants.find(shard.toString());
+ if (iter == o().participants.end())
return nullptr;
- if (_atClusterTime) {
+ if (o().atClusterTime) {
_verifyParticipantAtClusterTime(iter->second);
}
return &iter->second;
}
-TransactionRouter::Participant& TransactionRouter::_createParticipant(const ShardId& shard) {
+TransactionRouter::Participant& TransactionRouter::Router::_createParticipant(
+ OperationContext* opCtx, const ShardId& shard) {
+
// The first participant is chosen as the coordinator.
- auto isFirstParticipant = _participants.empty();
+ auto isFirstParticipant = o().participants.empty();
if (isFirstParticipant) {
- invariant(!_coordinatorId);
- _coordinatorId = shard.toString();
+ invariant(!o().coordinatorId);
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).coordinatorId = shard.toString();
}
SharedTransactionOptions sharedOptions = {
- _txnNumber,
- _readConcernArgs,
- _atClusterTime ? boost::optional<LogicalTime>(_atClusterTime->getTime()) : boost::none};
+ o().txnNumber,
+ o().readConcernArgs,
+ o().atClusterTime ? boost::optional<LogicalTime>(o().atClusterTime->getTime())
+ : boost::none};
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
auto resultPair =
- _participants.try_emplace(shard.toString(),
- TransactionRouter::Participant(
- isFirstParticipant, _latestStmtId, std::move(sharedOptions)));
+ o(lk).participants.try_emplace(shard.toString(),
+ TransactionRouter::Participant(isFirstParticipant,
+ p().latestStmtId,
+ Participant::ReadOnly::kUnset,
+ std::move(sharedOptions)));
return resultPair.first->second;
}
-void TransactionRouter::_assertAbortStatusIsOkOrNoSuchTransaction(
+void TransactionRouter::Router::_setReadOnlyForParticipant(OperationContext* opCtx,
+ const ShardId& shard,
+ const Participant::ReadOnly readOnly) {
+ const auto iter = o().participants.find(shard.toString());
+ invariant(iter != o().participants.end());
+ const auto currentParticipant = iter->second;
+
+ auto newParticipant =
+ TransactionRouter::Participant(currentParticipant.isCoordinator,
+ currentParticipant.stmtIdCreatedAt,
+ readOnly,
+ std::move(currentParticipant.sharedOptions));
+
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).participants.erase(iter);
+ o(lk).participants.try_emplace(shard.toString(), std::move(newParticipant));
+}
+
+void TransactionRouter::Router::_assertAbortStatusIsOkOrNoSuchTransaction(
const AsyncRequestsSender::Response& response) const {
auto shardResponse = uassertStatusOKWithContext(
std::move(response.swResponse),
str::stream() << "Failed to send abort to shard " << response.shardId
<< " between retries of statement "
- << _latestStmtId);
+ << p().latestStmtId);
auto status = getStatusFromCommandResult(shardResponse.data);
uassert(ErrorCodes::NoSuchTransaction,
str::stream() << txnIdToString() << "Transaction aborted between retries of statement "
- << _latestStmtId
+ << p().latestStmtId
<< " due to error: "
<< status
<< " from shard: "
@@ -483,17 +537,17 @@ void TransactionRouter::_assertAbortStatusIsOkOrNoSuchTransaction(
// concern error.
}
-std::vector<ShardId> TransactionRouter::_getPendingParticipants() const {
+std::vector<ShardId> TransactionRouter::Router::_getPendingParticipants() const {
std::vector<ShardId> pendingParticipants;
- for (const auto& participant : _participants) {
- if (participant.second.stmtIdCreatedAt == _latestStmtId) {
+ for (const auto& participant : o().participants) {
+ if (participant.second.stmtIdCreatedAt == p().latestStmtId) {
pendingParticipants.emplace_back(ShardId(participant.first));
}
}
return pendingParticipants;
}
-void TransactionRouter::_clearPendingParticipants(OperationContext* opCtx) {
+void TransactionRouter::Router::_clearPendingParticipants(OperationContext* opCtx) {
const auto pendingParticipants = _getPendingParticipants();
// Send abort to each pending participant. This resets their transaction state and guarantees no
@@ -520,30 +574,33 @@ void TransactionRouter::_clearPendingParticipants(OperationContext* opCtx) {
// If the participant being removed was chosen as the recovery shard, reset the recovery
// shard. This is safe because this participant is a pending participant, meaning it
// cannot have been returned in the recoveryToken on an earlier statement.
- if (_recoveryShardId && *_recoveryShardId == participant) {
- _recoveryShardId.reset();
+ if (p().recoveryShardId && *p().recoveryShardId == participant) {
+ p().recoveryShardId.reset();
}
- invariant(_participants.erase(participant));
+
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ invariant(o(lk).participants.erase(participant));
}
// If there are no more participants, also clear the coordinator id because a new one must be
// chosen by the retry.
- if (_participants.empty()) {
- _coordinatorId.reset();
+ if (o().participants.empty()) {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).coordinatorId.reset();
return;
}
// If participants were created by an earlier command, the coordinator must be one of them.
- invariant(_coordinatorId);
- invariant(_participants.count(*_coordinatorId) == 1);
+ invariant(o().coordinatorId);
+ invariant(o().participants.count(*o().coordinatorId) == 1);
}
-bool TransactionRouter::canContinueOnStaleShardOrDbError(StringData cmdName) const {
+bool TransactionRouter::Router::canContinueOnStaleShardOrDbError(StringData cmdName) const {
if (MONGO_FAIL_POINT(enableStaleVersionAndSnapshotRetriesWithinTransactions)) {
// We can always retry on the first overall statement because all targeted participants must
// be pending, so the retry will restart the local transaction on each one, overwriting any
// effects from the first attempt.
- if (_latestStmtId == _firstStmtId) {
+ if (p().latestStmtId == p().firstStmtId) {
return true;
}
@@ -563,9 +620,9 @@ bool TransactionRouter::canContinueOnStaleShardOrDbError(StringData cmdName) con
return false;
}
-void TransactionRouter::onStaleShardOrDbError(OperationContext* opCtx,
- StringData cmdName,
- const Status& errorStatus) {
+void TransactionRouter::Router::onStaleShardOrDbError(OperationContext* opCtx,
+ StringData cmdName,
+ const Status& errorStatus) {
invariant(canContinueOnStaleShardOrDbError(cmdName));
LOG(3) << txnIdToString()
@@ -576,7 +633,8 @@ void TransactionRouter::onStaleShardOrDbError(OperationContext* opCtx,
_clearPendingParticipants(opCtx);
}
-void TransactionRouter::onViewResolutionError(OperationContext* opCtx, const NamespaceString& nss) {
+void TransactionRouter::Router::onViewResolutionError(OperationContext* opCtx,
+ const NamespaceString& nss) {
// The router can always retry on a view resolution error.
LOG(3) << txnIdToString()
@@ -588,72 +646,80 @@ void TransactionRouter::onViewResolutionError(OperationContext* opCtx, const Nam
_clearPendingParticipants(opCtx);
}
-bool TransactionRouter::canContinueOnSnapshotError() const {
+bool TransactionRouter::Router::canContinueOnSnapshotError() const {
if (MONGO_FAIL_POINT(enableStaleVersionAndSnapshotRetriesWithinTransactions)) {
- return _atClusterTime && _atClusterTime->canChange(_latestStmtId);
+ return o().atClusterTime && o().atClusterTime->canChange(p().latestStmtId);
}
return false;
}
-void TransactionRouter::onSnapshotError(OperationContext* opCtx, const Status& errorStatus) {
+void TransactionRouter::Router::onSnapshotError(OperationContext* opCtx,
+ const Status& errorStatus) {
invariant(canContinueOnSnapshotError());
LOG(3) << txnIdToString() << " Clearing pending participants and resetting global snapshot "
"timestamp after snapshot error: "
- << errorStatus << ", previous timestamp: " << _atClusterTime->getTime();
+ << errorStatus << ", previous timestamp: " << o().atClusterTime->getTime();
// The transaction must be restarted on all participants because a new read timestamp will be
// selected, so clear all pending participants. Snapshot errors are only retryable on the first
// client statement, so all participants should be cleared, including the coordinator.
_clearPendingParticipants(opCtx);
- invariant(_participants.empty());
- invariant(!_coordinatorId);
+ invariant(o().participants.empty());
+ invariant(!o().coordinatorId);
+
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
// Reset the global snapshot timestamp so the retry will select a new one.
- _atClusterTime.reset();
- _atClusterTime.emplace();
+ o(lk).atClusterTime.reset();
+ o(lk).atClusterTime.emplace();
}
-void TransactionRouter::setDefaultAtClusterTime(OperationContext* opCtx) {
- if (!_atClusterTime || !_atClusterTime->canChange(_latestStmtId)) {
+void TransactionRouter::Router::setDefaultAtClusterTime(OperationContext* opCtx) {
+ if (!o().atClusterTime || !o().atClusterTime->canChange(p().latestStmtId)) {
return;
}
auto defaultTime = LogicalClock::get(opCtx)->getClusterTime();
- _setAtClusterTime(repl::ReadConcernArgs::get(opCtx).getArgsAfterClusterTime(), defaultTime);
+ _setAtClusterTime(
+ opCtx, repl::ReadConcernArgs::get(opCtx).getArgsAfterClusterTime(), defaultTime);
}
-void TransactionRouter::_setAtClusterTime(const boost::optional<LogicalTime>& afterClusterTime,
- LogicalTime candidateTime) {
+void TransactionRouter::Router::_setAtClusterTime(
+ OperationContext* opCtx,
+ const boost::optional<LogicalTime>& afterClusterTime,
+ LogicalTime candidateTime) {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+
// If the user passed afterClusterTime, the chosen time must be greater than or equal to it.
if (afterClusterTime && *afterClusterTime > candidateTime) {
- _atClusterTime->setTime(*afterClusterTime, _latestStmtId);
+ o(lk).atClusterTime->setTime(*afterClusterTime, p().latestStmtId);
return;
}
LOG(2) << txnIdToString() << " Setting global snapshot timestamp to " << candidateTime
- << " on statement " << _latestStmtId;
+ << " on statement " << p().latestStmtId;
- _atClusterTime->setTime(candidateTime, _latestStmtId);
+ o(lk).atClusterTime->setTime(candidateTime, p().latestStmtId);
}
-void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
- TxnNumber txnNumber,
- TransactionActions action) {
- if (txnNumber < _txnNumber) {
+void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ TransactionActions action) {
+ if (txnNumber < o().txnNumber) {
// This transaction is older than the transaction currently in progress, so throw an error.
uasserted(ErrorCodes::TransactionTooOld,
str::stream() << "txnNumber " << txnNumber << " is less than last txnNumber "
- << _txnNumber
+ << o().txnNumber
<< " seen in session "
<< _sessionId());
- } else if (txnNumber == _txnNumber) {
+ } else if (txnNumber == o().txnNumber) {
// This is the same transaction as the one in progress.
switch (action) {
case TransactionActions::kStart: {
uasserted(ErrorCodes::ConflictingOperationInProgress,
- str::stream() << "txnNumber " << _txnNumber << " for session "
+ str::stream() << "txnNumber " << o().txnNumber << " for session "
<< _sessionId()
<< " already started");
}
@@ -662,15 +728,15 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
"Only the first command in a transaction may specify a readConcern",
repl::ReadConcernArgs::get(opCtx).isEmpty());
- repl::ReadConcernArgs::get(opCtx) = _readConcernArgs;
- ++_latestStmtId;
+ repl::ReadConcernArgs::get(opCtx) = o().readConcernArgs;
+ ++p().latestStmtId;
return;
}
case TransactionActions::kCommit:
- ++_latestStmtId;
+ ++p().latestStmtId;
return;
}
- } else if (txnNumber > _txnNumber) {
+ } else if (txnNumber > o().txnNumber) {
// This is a newer transaction.
switch (action) {
case TransactionActions::kStart: {
@@ -682,12 +748,17 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
!readConcernArgs.hasLevel() ||
isReadConcernLevelAllowedInTransaction(readConcernArgs.getLevel()));
- _resetRouterState(txnNumber);
+ _resetRouterState(opCtx, txnNumber);
- _readConcernArgs = readConcernArgs;
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).readConcernArgs = readConcernArgs;
+ }
- if (_readConcernArgs.getLevel() == repl::ReadConcernLevel::kSnapshotReadConcern) {
- _atClusterTime.emplace();
+ if (o().readConcernArgs.getLevel() ==
+ repl::ReadConcernLevel::kSnapshotReadConcern) {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).atClusterTime.emplace();
}
_onNewTransaction(opCtx);
@@ -696,16 +767,17 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
}
case TransactionActions::kContinue: {
uasserted(ErrorCodes::NoSuchTransaction,
- str::stream() << "cannot continue txnId " << _txnNumber << " for session "
+ str::stream() << "cannot continue txnId " << o().txnNumber
+ << " for session "
<< _sessionId()
<< " with txnId "
<< txnNumber);
}
case TransactionActions::kCommit: {
- _resetRouterState(txnNumber);
+ _resetRouterState(opCtx, txnNumber);
// If the first action seen by the router for this transaction is to commit, that
// means that the client is attempting to recover a commit decision.
- _isRecoveringCommit = true;
+ p().isRecoveringCommit = true;
_onBeginRecoveringDecision(opCtx);
LOG(3) << txnIdToString() << " Commit recovery started";
@@ -716,18 +788,18 @@ void TransactionRouter::beginOrContinueTxn(OperationContext* opCtx,
MONGO_UNREACHABLE;
}
-const LogicalSessionId& TransactionRouter::_sessionId() const {
- const auto* owningSession = getTransactionRouter.owner(this);
+const LogicalSessionId& TransactionRouter::Router::_sessionId() const {
+ const auto* owningSession = getTransactionRouter.owner(_tr);
return owningSession->getSessionId();
}
-BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx) {
- invariant(_coordinatorId);
- auto coordinatorIter = _participants.find(*_coordinatorId);
- invariant(coordinatorIter != _participants.end());
+BSONObj TransactionRouter::Router::_handOffCommitToCoordinator(OperationContext* opCtx) {
+ invariant(o().coordinatorId);
+ auto coordinatorIter = o().participants.find(*o().coordinatorId);
+ invariant(coordinatorIter != o().participants.end());
std::vector<CommitParticipant> participantList;
- for (const auto& participantEntry : _participants) {
+ for (const auto& participantEntry : o().participants) {
CommitParticipant commitParticipant;
commitParticipant.setShardId(participantEntry.first);
participantList.push_back(std::move(commitParticipant));
@@ -740,13 +812,13 @@ BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx)
BSON(WriteConcernOptions::kWriteConcernField << opCtx->getWriteConcern().toBSON()));
LOG(3) << txnIdToString()
- << " Committing using two-phase commit, coordinator: " << *_coordinatorId;
+ << " Committing using two-phase commit, coordinator: " << *o().coordinatorId;
MultiStatementTransactionRequestsSender ars(
opCtx,
Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
NamespaceString::kAdminDb,
- {{*_coordinatorId, coordinateCommitCmdObj}},
+ {{*o().coordinatorId, coordinateCommitCmdObj}},
ReadPreferenceSetting{ReadPreference::PrimaryOnly},
Shard::RetryPolicy::kIdempotent);
@@ -757,9 +829,9 @@ BSONObj TransactionRouter::_handOffCommitToCoordinator(OperationContext* opCtx)
return response.swResponse.getValue().data;
}
-BSONObj TransactionRouter::commitTransaction(
+BSONObj TransactionRouter::Router::commitTransaction(
OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken) {
- _terminationInitiated = true;
+ p().terminationInitiated = true;
auto commitRes = _commitTransaction(opCtx, recoveryToken);
@@ -786,32 +858,41 @@ BSONObj TransactionRouter::commitTransaction(
return commitRes;
}
-BSONObj TransactionRouter::_commitTransaction(
+BSONObj TransactionRouter::Router::_commitTransaction(
OperationContext* opCtx, const boost::optional<TxnRecoveryToken>& recoveryToken) {
- if (_isRecoveringCommit) {
+
+ if (p().isRecoveringCommit) {
uassert(50940,
"Cannot recover the transaction decision without a recoveryToken",
recoveryToken);
- _commitType = CommitType::kRecoverWithToken;
- _onStartCommit(opCtx);
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).commitType = CommitType::kRecoverWithToken;
+ _onStartCommit(lk, opCtx);
+ }
+
return _commitWithRecoveryToken(opCtx, *recoveryToken);
}
- if (_participants.empty()) {
+ if (o().participants.empty()) {
// The participants list can be empty if a transaction was began on mongos, but it never
// ended up targeting any hosts. Such cases are legal for example if a find is issued
// against a non-existent database.
uassert(ErrorCodes::IllegalOperation,
"Cannot commit without participants",
- _txnNumber != kUninitializedTxnNumber);
- _commitType = CommitType::kNoShards;
- _onStartCommit(opCtx);
+ o().txnNumber != kUninitializedTxnNumber);
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).commitType = CommitType::kNoShards;
+ _onStartCommit(lk, opCtx);
+ }
+
return BSON("ok" << 1);
}
std::vector<ShardId> readOnlyShards;
std::vector<ShardId> writeShards;
- for (const auto& participant : _participants) {
+ for (const auto& participant : o().participants) {
switch (participant.second.readOnly) {
case Participant::ReadOnly::kUnset:
uasserted(ErrorCodes::NoSuchTransaction,
@@ -829,20 +910,29 @@ BSONObj TransactionRouter::_commitTransaction(
}
}
- if (_participants.size() == 1) {
- ShardId shardId = _participants.cbegin()->first;
+ if (o().participants.size() == 1) {
+ ShardId shardId = o().participants.cbegin()->first;
LOG(3) << txnIdToString()
<< " Committing single-shard transaction, single participant: " << shardId;
- _commitType = CommitType::kSingleShard;
- _onStartCommit(opCtx);
+
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).commitType = CommitType::kSingleShard;
+ _onStartCommit(lk, opCtx);
+ }
+
return sendCommitDirectlyToShards(opCtx, {shardId});
}
if (writeShards.size() == 0) {
LOG(3) << txnIdToString() << " Committing read-only transaction on "
<< readOnlyShards.size() << " shards";
- _commitType = CommitType::kReadOnly;
- _onStartCommit(opCtx);
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).commitType = CommitType::kReadOnly;
+ _onStartCommit(lk, opCtx);
+ }
+
return sendCommitDirectlyToShards(opCtx, readOnlyShards);
}
@@ -850,8 +940,12 @@ BSONObj TransactionRouter::_commitTransaction(
LOG(3) << txnIdToString() << " Committing single-write-shard transaction with "
<< readOnlyShards.size()
<< " read-only shards, write shard: " << writeShards.front();
- _commitType = CommitType::kSingleWriteShard;
- _onStartCommit(opCtx);
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).commitType = CommitType::kSingleWriteShard;
+ _onStartCommit(lk, opCtx);
+ }
+
const auto readOnlyShardsResponse = sendCommitDirectlyToShards(opCtx, readOnlyShards);
if (!getStatusFromCommandResult(readOnlyShardsResponse).isOK() ||
@@ -861,30 +955,35 @@ BSONObj TransactionRouter::_commitTransaction(
return sendCommitDirectlyToShards(opCtx, writeShards);
}
- _commitType = CommitType::kTwoPhaseCommit;
- _onStartCommit(opCtx);
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).commitType = CommitType::kTwoPhaseCommit;
+ _onStartCommit(lk, opCtx);
+ }
+
return _handOffCommitToCoordinator(opCtx);
}
-BSONObj TransactionRouter::abortTransaction(OperationContext* opCtx) {
+BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) {
_onExplicitAbort(opCtx);
// The router has yet to send any commands to a remote shard for this transaction.
// Return the same error that would have been returned by a shard.
uassert(ErrorCodes::NoSuchTransaction,
"no known command has been sent by this router for this transaction",
- !_participants.empty());
+ !o().participants.empty());
- _terminationInitiated = true;
+ p().terminationInitiated = true;
auto abortCmd = BSON("abortTransaction" << 1 << WriteConcernOptions::kWriteConcernField
<< opCtx->getWriteConcern().toBSON());
std::vector<AsyncRequestsSender::Request> abortRequests;
- for (const auto& participantEntry : _participants) {
+ for (const auto& participantEntry : o().participants) {
abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd);
}
- LOG(3) << txnIdToString() << " Aborting transaction on " << _participants.size() << " shard(s)";
+ LOG(3) << txnIdToString() << " Aborting transaction on " << o().participants.size()
+ << " shard(s)";
const auto responses = gatherResponses(opCtx,
NamespaceString::kAdminDb,
@@ -916,10 +1015,10 @@ BSONObj TransactionRouter::abortTransaction(OperationContext* opCtx) {
return lastResult;
}
-void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx,
- const Status& errorStatus) {
- if (_commitType == CommitType::kTwoPhaseCommit ||
- _commitType == CommitType::kRecoverWithToken) {
+void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opCtx,
+ const Status& errorStatus) {
+ if (o().commitType == CommitType::kTwoPhaseCommit ||
+ o().commitType == CommitType::kRecoverWithToken) {
LOG(3) << txnIdToString() << " Router not sending implicit abortTransaction because commit "
"may have been handed off to the coordinator";
return;
@@ -927,19 +1026,19 @@ void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx,
_onImplicitAbort(opCtx, errorStatus);
- if (_participants.empty()) {
+ if (o().participants.empty()) {
return;
}
- _terminationInitiated = true;
+ p().terminationInitiated = true;
auto abortCmd = BSON("abortTransaction" << 1);
std::vector<AsyncRequestsSender::Request> abortRequests;
- for (const auto& participantEntry : _participants) {
+ for (const auto& participantEntry : o().participants) {
abortRequests.emplace_back(ShardId(participantEntry.first), abortCmd);
}
- LOG(3) << txnIdToString() << " Implicitly aborting transaction on " << _participants.size()
+ LOG(3) << txnIdToString() << " Implicitly aborting transaction on " << o().participants.size()
<< " shard(s) due to error: " << errorStatus;
try {
@@ -956,48 +1055,50 @@ void TransactionRouter::implicitlyAbortTransaction(OperationContext* opCtx,
}
}
-std::string TransactionRouter::txnIdToString() const {
- return str::stream() << _sessionId().getId() << ":" << _txnNumber;
+std::string TransactionRouter::Router::txnIdToString() const {
+ return str::stream() << _sessionId().getId() << ":" << o().txnNumber;
}
-void TransactionRouter::appendRecoveryToken(BSONObjBuilder* builder) const {
+void TransactionRouter::Router::appendRecoveryToken(BSONObjBuilder* builder) const {
BSONObjBuilder recoveryTokenBuilder(
builder->subobjStart(CommitTransaction::kRecoveryTokenFieldName));
TxnRecoveryToken recoveryToken;
// The recovery shard is chosen on the first statement that did a write (transactions that only
// did reads do not need to be recovered; they can just be retried).
- if (_recoveryShardId) {
- invariant(_participants.find(*_recoveryShardId)->second.readOnly ==
+ if (p().recoveryShardId) {
+ invariant(o().participants.find(*p().recoveryShardId)->second.readOnly ==
Participant::ReadOnly::kNotReadOnly);
- recoveryToken.setRecoveryShardId(*_recoveryShardId);
+ recoveryToken.setRecoveryShardId(*p().recoveryShardId);
}
recoveryToken.serialize(&recoveryTokenBuilder);
recoveryTokenBuilder.doneFast();
}
-void TransactionRouter::_resetRouterState(const TxnNumber& txnNumber) {
- _txnNumber = txnNumber;
- _commitType = CommitType::kNotInitiated;
- _isRecoveringCommit = false;
- _participants.clear();
- _coordinatorId.reset();
- _recoveryShardId.reset();
- _readConcernArgs = {};
- _atClusterTime.reset();
- _abortCause = std::string();
- _timingStats = TimingStats();
- _terminationInitiated = false;
+void TransactionRouter::Router::_resetRouterState(OperationContext* opCtx,
+ const TxnNumber& txnNumber) {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).txnNumber = txnNumber;
+ o(lk).commitType = CommitType::kNotInitiated;
+ p().isRecoveringCommit = false;
+ o(lk).participants.clear();
+ o(lk).coordinatorId.reset();
+ p().recoveryShardId.reset();
+ o(lk).readConcernArgs = {};
+ o(lk).atClusterTime.reset();
+ o(lk).abortCause = std::string();
+ o(lk).timingStats = TimingStats();
+ p().terminationInitiated = false;
// TODO SERVER-37115: Parse statement ids from the client and remember the statement id
// of the command that started the transaction, if one was included.
- _latestStmtId = kDefaultFirstStmtId;
- _firstStmtId = kDefaultFirstStmtId;
+ p().latestStmtId = kDefaultFirstStmtId;
+ p().firstStmtId = kDefaultFirstStmtId;
};
-BSONObj TransactionRouter::_commitWithRecoveryToken(OperationContext* opCtx,
- const TxnRecoveryToken& recoveryToken) {
+BSONObj TransactionRouter::Router::_commitWithRecoveryToken(OperationContext* opCtx,
+ const TxnRecoveryToken& recoveryToken) {
uassert(ErrorCodes::NoSuchTransaction,
"Recovery token is empty, meaning the transaction only performed reads and can be "
"safely retried",
@@ -1027,13 +1128,13 @@ BSONObj TransactionRouter::_commitWithRecoveryToken(OperationContext* opCtx,
.response;
}
-void TransactionRouter::_logSlowTransaction(OperationContext* opCtx,
- TerminationCause terminationCause) const {
+void TransactionRouter::Router::_logSlowTransaction(OperationContext* opCtx,
+ TerminationCause terminationCause) const {
log() << "transaction " << _transactionInfoForLog(opCtx, terminationCause);
}
-std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx,
- TerminationCause terminationCause) const {
+std::string TransactionRouter::Router::_transactionInfoForLog(
+ OperationContext* opCtx, TerminationCause terminationCause) const {
StringBuilder sb;
BSONObjBuilder parametersBuilder;
@@ -1042,24 +1143,24 @@ std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx,
_sessionId().serialize(&lsidBuilder);
lsidBuilder.doneFast();
- parametersBuilder.append("txnNumber", _txnNumber);
+ parametersBuilder.append("txnNumber", o().txnNumber);
parametersBuilder.append("autocommit", false);
- _readConcernArgs.appendInfo(&parametersBuilder);
+ o().readConcernArgs.appendInfo(&parametersBuilder);
sb << "parameters:" << parametersBuilder.obj().toString() << ",";
- if (_atClusterTime) {
- sb << " globalReadTimestamp:" << _atClusterTime->getTime().toString() << ",";
+ if (o().atClusterTime) {
+ sb << " globalReadTimestamp:" << o().atClusterTime->getTime().toString() << ",";
}
- if (_commitType != CommitType::kRecoverWithToken) {
+ if (o().commitType != CommitType::kRecoverWithToken) {
// We don't know the participants if we're recovering the commit.
- sb << " numParticipants:" << _participants.size() << ",";
+ sb << " numParticipants:" << o().participants.size() << ",";
}
- if (_commitType == CommitType::kTwoPhaseCommit) {
- invariant(_coordinatorId);
- sb << " coordinator:" << *_coordinatorId << ",";
+ if (o().commitType == CommitType::kTwoPhaseCommit) {
+ invariant(o().coordinatorId);
+ sb << " coordinator:" << *o().coordinatorId << ",";
}
auto tickSource = opCtx->getServiceContext()->getTickSource();
@@ -1068,22 +1169,22 @@ std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx,
if (terminationCause == TerminationCause::kCommitted) {
sb << " terminationCause:committed,";
- invariant(_commitType != CommitType::kNotInitiated);
- invariant(_abortCause.empty());
+ invariant(o().commitType != CommitType::kNotInitiated);
+ invariant(o().abortCause.empty());
} else {
sb << " terminationCause:aborted,";
- invariant(!_abortCause.empty());
- sb << " abortCause:" << _abortCause << ",";
+ invariant(!o().abortCause.empty());
+ sb << " abortCause:" << o().abortCause << ",";
// TODO SERVER-40985: Log abortSource
}
- if (_commitType != CommitType::kNotInitiated) {
- sb << " commitType:" << _commitTypeToString(_commitType) << ",";
+ if (o().commitType != CommitType::kNotInitiated) {
+ sb << " commitType:" << commitTypeToString(o().commitType) << ",";
sb << " commitDurationMicros:"
- << durationCount<Microseconds>(_timingStats.getCommitDuration(tickSource, curTicks))
+ << durationCount<Microseconds>(o().timingStats.getCommitDuration(tickSource, curTicks))
<< ",";
}
@@ -1093,29 +1194,36 @@ std::string TransactionRouter::_transactionInfoForLog(OperationContext* opCtx,
// Total duration of the transaction. Logged at the end of the line for consistency with slow
// command logging.
- sb << " " << duration_cast<Milliseconds>(_timingStats.getDuration(tickSource, curTicks));
+ sb << " " << duration_cast<Milliseconds>(o().timingStats.getDuration(tickSource, curTicks));
return sb.str();
}
-void TransactionRouter::_onNewTransaction(OperationContext* opCtx) {
+void TransactionRouter::Router::_onNewTransaction(OperationContext* opCtx) {
auto tickSource = opCtx->getServiceContext()->getTickSource();
- _timingStats.startTime = tickSource->getTicks();
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).timingStats.startTime = tickSource->getTicks();
+ }
auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
routerTxnMetrics->incrementTotalStarted();
}
-void TransactionRouter::_onBeginRecoveringDecision(OperationContext* opCtx) {
+void TransactionRouter::Router::_onBeginRecoveringDecision(OperationContext* opCtx) {
auto tickSource = opCtx->getServiceContext()->getTickSource();
- _timingStats.startTime = tickSource->getTicks();
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).timingStats.startTime = tickSource->getTicks();
+ }
auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
routerTxnMetrics->incrementTotalStarted();
}
-void TransactionRouter::_onImplicitAbort(OperationContext* opCtx, const Status& errorStatus) {
- if (_commitType != CommitType::kNotInitiated && _timingStats.endTime == 0) {
+void TransactionRouter::Router::_onImplicitAbort(OperationContext* opCtx,
+ const Status& errorStatus) {
+ if (o().commitType != CommitType::kNotInitiated && o().timingStats.endTime == 0) {
// If commit was started but an end time wasn't set, then we don't know the commit result
// and can't consider the transaction over until the client retries commit and definitively
// learns the result. Note that this behavior may lead to no logging in some cases, but
@@ -1125,58 +1233,62 @@ void TransactionRouter::_onImplicitAbort(OperationContext* opCtx, const Status&
// Implicit abort may execute multiple times if a misbehaving client keeps sending statements
// for a txnNumber after receiving an error, so only remember the first abort cause.
- if (_abortCause.empty()) {
- _abortCause = errorStatus.codeString();
+ if (o().abortCause.empty()) {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).abortCause = errorStatus.codeString();
}
_endTransactionTrackingIfNecessary(opCtx, TerminationCause::kAborted);
}
-void TransactionRouter::_onExplicitAbort(OperationContext* opCtx) {
+void TransactionRouter::Router::_onExplicitAbort(OperationContext* opCtx) {
// A behaving client should never try to commit after attempting to abort, so we can consider
// the transaction terminated as soon as explicit abort is observed.
- if (_abortCause.empty()) {
+ if (o().abortCause.empty()) {
// Note this code means the abort was from a user abortTransaction command.
- _abortCause = "abort";
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).abortCause = "abort";
}
_endTransactionTrackingIfNecessary(opCtx, TerminationCause::kAborted);
}
-void TransactionRouter::_onStartCommit(OperationContext* opCtx) {
- invariant(_commitType != CommitType::kNotInitiated);
+void TransactionRouter::Router::_onStartCommit(WithLock wl, OperationContext* opCtx) {
+ invariant(o().commitType != CommitType::kNotInitiated);
- if (_timingStats.commitStartTime != 0) {
+ if (o().timingStats.commitStartTime != 0) {
return;
}
auto tickSource = opCtx->getServiceContext()->getTickSource();
- _timingStats.commitStartTime = tickSource->getTicks();
+ o(wl).timingStats.commitStartTime = tickSource->getTicks();
auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
- routerTxnMetrics->incrementCommitInitiated(_commitType);
- if (_commitType != CommitType::kRecoverWithToken) {
+ routerTxnMetrics->incrementCommitInitiated(o().commitType);
+ if (o().commitType != CommitType::kRecoverWithToken) {
// We only know the participant list if we're not recovering a decision.
- routerTxnMetrics->addToTotalParticipantsAtCommit(_participants.size());
+ routerTxnMetrics->addToTotalParticipantsAtCommit(o().participants.size());
}
}
-void TransactionRouter::_onNonRetryableCommitError(OperationContext* opCtx, Status commitStatus) {
+void TransactionRouter::Router::_onNonRetryableCommitError(OperationContext* opCtx,
+ Status commitStatus) {
// If the commit failed with a command error that can't be retried on, the transaction shouldn't
// be able to eventually commit, so it can be considered over from the router's perspective.
- if (_abortCause.empty()) {
- _abortCause = commitStatus.codeString();
+ if (o().abortCause.empty()) {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).abortCause = commitStatus.codeString();
}
_endTransactionTrackingIfNecessary(opCtx, TerminationCause::kAborted);
}
-void TransactionRouter::_onSuccessfulCommit(OperationContext* opCtx) {
+void TransactionRouter::Router::_onSuccessfulCommit(OperationContext* opCtx) {
_endTransactionTrackingIfNecessary(opCtx, TerminationCause::kCommitted);
}
-void TransactionRouter::_endTransactionTrackingIfNecessary(OperationContext* opCtx,
- TerminationCause terminationCause) {
- if (_timingStats.endTime != 0) {
+void TransactionRouter::Router::_endTransactionTrackingIfNecessary(
+ OperationContext* opCtx, TerminationCause terminationCause) {
+ if (o().timingStats.endTime != 0) {
// If the transaction was already ended, don't end it again.
return;
}
@@ -1184,21 +1296,25 @@ void TransactionRouter::_endTransactionTrackingIfNecessary(OperationContext* opC
auto tickSource = opCtx->getServiceContext()->getTickSource();
auto curTicks = tickSource->getTicks();
- _timingStats.endTime = curTicks;
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
+ o(lk).timingStats.endTime = curTicks;
+ }
if (shouldLog(logger::LogComponent::kTransaction, logger::LogSeverity::Debug(1)) ||
- _timingStats.getDuration(tickSource, curTicks) > Milliseconds(serverGlobalParams.slowMS)) {
+ o().timingStats.getDuration(tickSource, curTicks) >
+ Milliseconds(serverGlobalParams.slowMS)) {
_logSlowTransaction(opCtx, terminationCause);
}
auto routerTxnMetrics = RouterTransactionsMetrics::get(opCtx);
if (terminationCause == TerminationCause::kAborted) {
routerTxnMetrics->incrementTotalAborted();
- routerTxnMetrics->incrementAbortCauseMap(_abortCause);
+ routerTxnMetrics->incrementAbortCauseMap(o().abortCause);
} else {
routerTxnMetrics->incrementTotalCommitted();
routerTxnMetrics->incrementCommitSuccessful(
- _commitType, _timingStats.getCommitDuration(tickSource, curTicks));
+ o().commitType, o().timingStats.getCommitDuration(tickSource, curTicks));
}
}
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index 1577880a92c..4d442f3a225 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -47,7 +47,17 @@ namespace mongo {
* Keeps track of the transaction state. A session is in use when it is being used by a request.
*/
class TransactionRouter {
+ struct PrivateState;
+ struct ObservableState;
+
public:
+ TransactionRouter();
+ TransactionRouter(const TransactionRouter&) = delete;
+ TransactionRouter& operator=(const TransactionRouter&) = delete;
+ TransactionRouter(TransactionRouter&&) = delete;
+ TransactionRouter& operator=(TransactionRouter&&) = delete;
+ ~TransactionRouter();
+
// The type of commit initiated for this transaction.
enum class CommitType {
kNotInitiated,
@@ -86,6 +96,7 @@ public:
Participant(bool isCoordinator,
StmtId stmtIdCreatedAt,
+ ReadOnly inReadOnly,
SharedTransactionOptions sharedOptions);
/**
@@ -98,13 +109,13 @@ public:
// Is updated to kReadOnly or kNotReadOnly based on the readOnly field in the participant's
// responses to statements.
- ReadOnly readOnly{ReadOnly::kUnset};
-
- // The highest statement id of the request during which this participant was created.
- const StmtId stmtIdCreatedAt;
+ const ReadOnly readOnly{ReadOnly::kUnset};
// Returns the shared transaction options this participant was created with
const SharedTransactionOptions sharedOptions;
+
+ // The highest statement id of the request during which this participant was created.
+ const StmtId stmtIdCreatedAt;
};
// Container for timing stats for the current transaction. Includes helpers for calculating some
@@ -174,368 +185,418 @@ public:
LogicalTime _atClusterTime;
};
- TransactionRouter();
- TransactionRouter(const TransactionRouter&) = delete;
- TransactionRouter& operator=(const TransactionRouter&) = delete;
- TransactionRouter(TransactionRouter&&) = delete;
- TransactionRouter& operator=(TransactionRouter&&) = delete;
- ~TransactionRouter();
-
/**
- * Extract the runtime state attached to the operation context. Returns nullptr if none is
- * attached.
+ * Class used by observers to examine the state of a TransactionRouter.
*/
- static TransactionRouter* get(OperationContext* opCtx);
- static TransactionRouter* get(const ObservableSession& osession);
+ class Observer {
+ public:
+ explicit Observer(const ObservableSession& session);
- /**
- * Starts a fresh transaction in this session or continue an existing one. Also cleans up the
- * previous transaction state.
- */
- void beginOrContinueTxn(OperationContext* opCtx,
- TxnNumber txnNumber,
- TransactionActions action);
+ protected:
+ explicit Observer(TransactionRouter* tr) : _tr(tr) {}
- /**
- * Attaches the required transaction related fields for a request to be sent to the given
- * shard.
- *
- * Calling this method has the following side effects:
- * 1. Potentially selecting a coordinator.
- * 2. Adding the shard to the list of participants.
- * 3. Also append fields for first statements (ex. startTransaction, readConcern)
- * if the shard was newly added to the list of participants.
- */
- BSONObj attachTxnFieldsIfNeeded(OperationContext* opCtx,
- const ShardId& shardId,
- const BSONObj& cmdObj);
+ const TransactionRouter::ObservableState& o() const {
+ return _tr->_o;
+ }
- /**
- * Processes the transaction metadata in the response from the participant if the response
- * indicates the operation succeeded.
- */
- void processParticipantResponse(const ShardId& shardId, const BSONObj& responseObj);
+ TransactionRouter* _tr;
+ }; // class Observer
/**
- * Returns true if the current transaction can retry on a stale version error from a contacted
- * shard. This is always true except for an error received by a write that is not the first
- * overall statement in the sharded transaction. This is because the entire command will be
- * retried, and shards that were not stale and are targeted again may incorrectly execute the
- * command a second time.
- *
- * Note: Even if this method returns true, the retry attempt may still fail, e.g. if one of the
- * shards that returned a stale version error was involved in a previously completed a statement
- * for this transaction.
- *
- * TODO SERVER-37207: Change batch writes to retry only the failed writes in a batch, to allow
- * retrying writes beyond the first overall statement.
+ * Class used by a thread that has checked out the TransactionRouter's session to observe
+ * and modify the transaction router.
*/
- bool canContinueOnStaleShardOrDbError(StringData cmdName) const;
+ class Router : public Observer {
+ public:
+ explicit Router(OperationContext* opCtx);
- /**
- * Updates the transaction state to allow for a retry of the current command on a stale version
- * error. This includes sending abortTransaction to all cleared participants. Will throw if the
- * transaction cannot be continued.
- */
- void onStaleShardOrDbError(OperationContext* opCtx,
- StringData cmdName,
- const Status& errorStatus);
+ explicit operator bool() const {
+ return _tr;
+ }
- /**
- * Returns true if the current transaction can retry on a snapshot error. This is only true on
- * the first command recevied for a transaction.
- */
- bool canContinueOnSnapshotError() const;
+ /**
+ * Starts a fresh transaction in this session or continue an existing one. Also cleans up the
+ * previous transaction state.
+ */
+ void beginOrContinueTxn(OperationContext* opCtx,
+ TxnNumber txnNumber,
+ TransactionActions action);
- /**
- * Resets the transaction state to allow for a retry attempt. This includes clearing all
- * participants, clearing the coordinator, resetting the global read timestamp, and sending
- * abortTransaction to all cleared participants. Will throw if the transaction cannot be
- * continued.
- */
- void onSnapshotError(OperationContext* opCtx, const Status& errorStatus);
+ /**
+ * Attaches the required transaction related fields for a request to be sent to the given
+ * shard.
+ *
+ * Calling this method has the following side effects:
+ * 1. Potentially selecting a coordinator.
+ * 2. Adding the shard to the list of participants.
+ * 3. Also append fields for first statements (ex. startTransaction, readConcern)
+ * if the shard was newly added to the list of participants.
+ */
+ BSONObj attachTxnFieldsIfNeeded(OperationContext* opCtx,
+ const ShardId& shardId,
+ const BSONObj& cmdObj);
- /**
- * Updates the transaction tracking state to allow for a retry attempt on a view resolution
- * error. This includes sending abortTransaction to all cleared participants.
- */
- void onViewResolutionError(OperationContext* opCtx, const NamespaceString& nss);
+ /**
+ * Processes the transaction metadata in the response from the participant if the response
+ * indicates the operation succeeded.
+ */
+ void processParticipantResponse(OperationContext* opCtx,
+ const ShardId& shardId,
+ const BSONObj& responseObj);
- /**
- * Sets the atClusterTime for the current transaction to the latest time in the router's logical
- * clock. Does nothing if the transaction does not have snapshot read concern or an
- * atClusterTime has already been selected and cannot be changed.
- */
- void setDefaultAtClusterTime(OperationContext* opCtx);
+ /**
+ * Returns true if the current transaction can retry on a stale version error from a
+ * contacted shard. This is always true except for an error received by a write that is not
+ * the first overall statement in the sharded transaction. This is because the entire
+ * command will be retried, and shards that were not stale and are targeted again may
+ * incorrectly execute the command a second time.
+ *
+ * Note: Even if this method returns true, the retry attempt may still fail, e.g. if one of
+ * the shards that returned a stale version error was involved in a previously completed a
+ * statement for this transaction.
+ *
+ * TODO SERVER-37207: Change batch writes to retry only the failed writes in a batch, to
+ * allow retrying writes beyond the first overall statement.
+ */
+ bool canContinueOnStaleShardOrDbError(StringData cmdName) const;
- /**
- * Returns the global read timestamp for this transaction. Returns boost::none for transactions
- * that don't run at snapshot level read concern or if a timestamp has not yet been selected.
- */
- const boost::optional<AtClusterTime>& getAtClusterTime() const;
+ /**
+ * Updates the transaction state to allow for a retry of the current command on a stale
+ * version error. This includes sending abortTransaction to all cleared participants. Will
+ * throw if the transaction cannot be continued.
+ */
+ void onStaleShardOrDbError(OperationContext* opCtx,
+ StringData cmdName,
+ const Status& errorStatus);
- /**
- * If a coordinator has been selected for the current transaction, returns its id.
- */
- const boost::optional<ShardId>& getCoordinatorId() const;
+ /**
+ * Returns true if the current transaction can retry on a snapshot error. This is only true
+ * on the first command recevied for a transaction.
+ */
+ bool canContinueOnSnapshotError() const;
- /**
- * If a recovery shard has been selected for the current transaction, returns its id.
- */
- const boost::optional<ShardId>& getRecoveryShardId() const;
+ /**
+ * Resets the transaction state to allow for a retry attempt. This includes clearing all
+ * participants, clearing the coordinator, resetting the global read timestamp, and sending
+ * abortTransaction to all cleared participants. Will throw if the transaction cannot be
+ * continued.
+ */
+ void onSnapshotError(OperationContext* opCtx, const Status& errorStatus);
- /**
- * Commits the transaction.
- *
- * For transactions that only did reads or only wrote to one shard, sends commit directly to the
- * participants and returns the first error response or the last (success) response.
- *
- * For transactions that performed writes to multiple shards, hands off the participant list to
- * the coordinator to do two-phase commit, and returns the coordinator's response.
- */
- BSONObj commitTransaction(OperationContext* opCtx,
- const boost::optional<TxnRecoveryToken>& recoveryToken);
+ /**
+ * Updates the transaction tracking state to allow for a retry attempt on a view resolution
+ * error. This includes sending abortTransaction to all cleared participants.
+ */
+ void onViewResolutionError(OperationContext* opCtx, const NamespaceString& nss);
- /**
- * Sends abort to all participants.
- *
- * Returns the first error response or the last (success) response.
- */
- BSONObj abortTransaction(OperationContext* opCtx);
+ /**
+ * Returns true if the associated transaction is running at snapshot level read concern.
+ */
+ bool mustUseAtClusterTime() const;
- /**
- * Sends abort to all shards in the current participant list. Will retry on retryable errors,
- * but ignores the responses from each shard.
- */
- void implicitlyAbortTransaction(OperationContext* opCtx, const Status& errorStatus);
+ /**
+ * Returns the read timestamp for this transaction. Callers must verify that the read
+ * timestamp has been selected for this transaction before calling this function.
+ */
+ LogicalTime getSelectedAtClusterTime() const;
- /**
- * Returns the participant for this transaction or nullptr if the specified shard is not
- * participant of this transaction.
- */
- Participant* getParticipant(const ShardId& shard);
+ /**
+ * Sets the atClusterTime for the current transaction to the latest time in the router's
+ * logical clock. Does nothing if the transaction does not have snapshot read concern or an
+ * atClusterTime has already been selected and cannot be changed.
+ */
+ void setDefaultAtClusterTime(OperationContext* opCtx);
- /**
- * If a coordinator has been selected for this transaction already, constructs a recovery token,
- * which can be used to resume commit or abort of the transaction from a different router.
- */
- void appendRecoveryToken(BSONObjBuilder* builder) const;
+ /**
+ * If a coordinator has been selected for the current transaction, returns its id.
+ */
+ const boost::optional<ShardId>& getCoordinatorId() const;
- /**
- * Returns a string with the active transaction's transaction number and logical session id
- * (i.e. the transaction id).
- */
- std::string txnIdToString() const;
+ /**
+ * If a recovery shard has been selected for the current transaction, returns its id.
+ */
+ const boost::optional<ShardId>& getRecoveryShardId() const;
- /**
- * Returns the statement id of the latest received command for this transaction.
- */
- StmtId getLatestStmtId() const {
- return _latestStmtId;
- }
+ /**
+ * Commits the transaction.
+ *
+ * For transactions that only did reads or only wrote to one shard, sends commit directly to
+ * the participants and returns the first error response or the last (success) response.
+ *
+ * For transactions that performed writes to multiple shards, hands off the participant list
+ * to the coordinator to do two-phase commit, and returns the coordinator's response.
+ */
+ BSONObj commitTransaction(OperationContext* opCtx,
+ const boost::optional<TxnRecoveryToken>& recoveryToken);
- /**
- * Returns a copy of the timing stats of the transaction router's active transaction.
- */
- TimingStats getTimingStats() const {
- return _timingStats;
- }
+ /**
+ * Sends abort to all participants.
+ *
+ * Returns the first error response or the last (success) response.
+ */
+ BSONObj abortTransaction(OperationContext* opCtx);
-private:
- // Helper to convert the CommitType enum into a human readable string for diagnostics.
- std::string _commitTypeToString(CommitType state) const {
- switch (state) {
- case CommitType::kNotInitiated:
- return "notInitiated";
- case CommitType::kNoShards:
- return "noShards";
- case CommitType::kSingleShard:
- return "singleShard";
- case CommitType::kSingleWriteShard:
- return "singleWriteShard";
- case CommitType::kReadOnly:
- return "readOnly";
- case CommitType::kTwoPhaseCommit:
- return "twoPhaseCommit";
- case CommitType::kRecoverWithToken:
- return "recoverWithToken";
+ /**
+ * Sends abort to all shards in the current participant list. Will retry on retryable errors,
+ * but ignores the responses from each shard.
+ */
+ void implicitlyAbortTransaction(OperationContext* opCtx, const Status& errorStatus);
+
+ /**
+ * If a coordinator has been selected for this transaction already, constructs a recovery
+ * token, which can be used to resume commit or abort of the transaction from a different
+ * router.
+ */
+ void appendRecoveryToken(BSONObjBuilder* builder) const;
+
+ /**
+ * Returns a string with the active transaction's transaction number and logical session id
+ * (i.e. the transaction id).
+ */
+ std::string txnIdToString() const;
+
+ /**
+ * Returns the participant for this transaction or nullptr if the specified shard is not
+ * participant of this transaction.
+ */
+ const Participant* getParticipant(const ShardId& shard);
+
+ /**
+ * Returns the statement id of the latest received command for this transaction.
+ */
+ StmtId getLatestStmtId() const {
+ return p().latestStmtId;
}
- MONGO_UNREACHABLE;
- }
- /**
- * Prints slow transaction information to the log.
- */
- void _logSlowTransaction(OperationContext* opCtx, TerminationCause terminationCause) const;
+ /**
+ * Returns a copy of the timing stats of the transaction router's active transaction.
+ */
+ const TimingStats& getTimingStats() const {
+ return o().timingStats;
+ }
- /**
- * Returns a string to be logged for slow transactions.
- */
- std::string _transactionInfoForLog(OperationContext* opCtx,
- TerminationCause terminationCause) const;
+ private:
+ /**
+ * Resets the router's state. Used when the router sees a new transaction for the first time.
+ * This is required because we don't create a new router object for each transaction, but
+ * instead reuse the same object across different transactions.
+ */
+ void _resetRouterState(OperationContext* opCtx, const TxnNumber& txnNumber);
- // Shortcut to obtain the id of the session under which this transaction router runs
- const LogicalSessionId& _sessionId() const;
+ /**
+ * Internal method for committing a transaction. Should only throw on failure to send commit.
+ */
+ BSONObj _commitTransaction(OperationContext* opCtx,
+ const boost::optional<TxnRecoveryToken>& recoveryToken);
- /**
- * Resets the router's state. Used when the router sees a new transaction for the first time.
- * This is required because we don't create a new router object for each transaction, but
- * instead reuse the same object across different transactions.
- */
- void _resetRouterState(const TxnNumber& txnNumber);
+ /**
+ * Retrieves the transaction's outcome from the shard specified in the recovery token.
+ */
+ BSONObj _commitWithRecoveryToken(OperationContext* opCtx,
+ const TxnRecoveryToken& recoveryToken);
- /**
- * Internal method for committing a transaction. Should only throw on failure to send commit.
- */
- BSONObj _commitTransaction(OperationContext* opCtx,
- const boost::optional<TxnRecoveryToken>& recoveryToken);
+ /**
+ * Hands off coordinating a two-phase commit across all participants to the coordinator
+ * shard.
+ */
+ BSONObj _handOffCommitToCoordinator(OperationContext* opCtx);
- /**
- * Retrieves the transaction's outcome from the shard specified in the recovery token.
- */
- BSONObj _commitWithRecoveryToken(OperationContext* opCtx,
- const TxnRecoveryToken& recoveryToken);
+ /**
+ * Sets the given logical time as the atClusterTime for the transaction to be the greater of
+ * the given time and the user's afterClusterTime, if one was provided.
+ */
+ void _setAtClusterTime(OperationContext* opCtx,
+ const boost::optional<LogicalTime>& afterClusterTime,
+ LogicalTime candidateTime);
- /**
- * Hands off coordinating a two-phase commit across all participants to the coordinator shard.
- */
- BSONObj _handOffCommitToCoordinator(OperationContext* opCtx);
+ /**
+ * Throws NoSuchTransaction if the response from abortTransaction failed with a code other
+ * than NoSuchTransaction. Does not check for write concern errors.
+ */
+ void _assertAbortStatusIsOkOrNoSuchTransaction(
+ const AsyncRequestsSender::Response& response) const;
- /**
- * Sets the given logical time as the atClusterTime for the transaction to be the greater of the
- * given time and the user's afterClusterTime, if one was provided.
- */
- void _setAtClusterTime(const boost::optional<LogicalTime>& afterClusterTime,
- LogicalTime candidateTime);
+ /**
+ * If the transaction's read concern level is snapshot, asserts the participant's
+ * atClusterTime matches the transaction's.
+ */
+ void _verifyParticipantAtClusterTime(const Participant& participant);
- /**
- * Throws NoSuchTransaction if the response from abortTransaction failed with a code other than
- * NoSuchTransaction. Does not check for write concern errors.
- */
- void _assertAbortStatusIsOkOrNoSuchTransaction(
- const AsyncRequestsSender::Response& response) const;
+ /**
+ * Removes all participants created during the current statement from the participant list
+ * and sends abortTransaction to each. Waits for all responses before returning.
+ */
+ void _clearPendingParticipants(OperationContext* opCtx);
- /**
- * Returns all participants created during the current statement.
- */
- std::vector<ShardId> _getPendingParticipants() const;
+ /**
+ * Creates a new participant for the shard.
+ */
+ TransactionRouter::Participant& _createParticipant(OperationContext* opCtx,
+ const ShardId& shard);
- /**
- * Removes all participants created during the current statement from the participant list and
- * sends abortTransaction to each. Waits for all responses before returning.
- */
- void _clearPendingParticipants(OperationContext* opCtx);
+ /**
+ * Sets the new readOnly value for the current participant on the shard.
+ */
+ void _setReadOnlyForParticipant(OperationContext* opCtx,
+ const ShardId& shard,
+ const Participant::ReadOnly readOnly);
- /**
- * Creates a new participant for the shard.
- */
- Participant& _createParticipant(const ShardId& shard);
+ /**
+ * Updates relevant metrics when a new transaction is begun.
+ */
+ void _onNewTransaction(OperationContext* opCtx);
- /**
- * If the transaction's read concern level is snapshot, asserts the participant's atClusterTime
- * matches the transaction's.
- */
- void _verifyParticipantAtClusterTime(const Participant& participant);
+ /**
+ * Updates relevant metrics when a router receives commit for a higher txnNumber than it has
+ * seen so far.
+ */
+ void _onBeginRecoveringDecision(OperationContext* opCtx);
- /**
- * Updates relevant metrics when a new transaction is begun.
- */
- void _onNewTransaction(OperationContext* opCtx);
+ /**
+ * Updates relevant metrics when the router receives an explicit abort from the client.
+ */
+ void _onExplicitAbort(OperationContext* opCtx);
- /**
- * Updates relevant metrics when a router receives commit for a higher txnNumber than it has
- * seen so far.
- */
- void _onBeginRecoveringDecision(OperationContext* opCtx);
+ /**
+ * Updates relevant metrics when the router begins an implicit abort after an error.
+ */
+ void _onImplicitAbort(OperationContext* opCtx, const Status& errorStatus);
- /**
- * Updates relevant metrics when the router receives an explicit abort from the client.
- */
- void _onExplicitAbort(OperationContext* opCtx);
+ /**
+ * Updates relevant metrics when a transaction is about to begin commit.
+ */
+ void _onStartCommit(WithLock wl, OperationContext* opCtx);
- /**
- * Updates relevant metrics when the router begins an implicit abort after an error.
- */
- void _onImplicitAbort(OperationContext* opCtx, const Status& errorStatus);
+ /**
+ * Updates relevant metrics when a transaction receives a successful response for commit.
+ */
+ void _onSuccessfulCommit(OperationContext* opCtx);
- /**
- * Updates relevant metrics when a transaction is about to begin commit.
- */
- void _onStartCommit(OperationContext* opCtx);
+ /**
+ * Updates relevant metrics when commit receives a response with a non-retryable command
+ * error per the retryable writes specification.
+ */
+ void _onNonRetryableCommitError(OperationContext* opCtx, Status commitStatus);
- /**
- * Updates relevant metrics when a transaction receives a successful response for commit.
- */
- void _onSuccessfulCommit(OperationContext* opCtx);
+ /**
+ * The first time this method is called it marks the transaction as over in the router's
+ * diagnostics and will log transaction information if its duration is over the global slowMS
+ * threshold or the transaction log componenet verbosity >= 1. Only meant to be called when
+ * the router definitively knows the transaction's outcome, e.g. it should not be invoked
+ * after a network error on commit.
+ */
+ void _endTransactionTrackingIfNecessary(OperationContext* opCtx,
+ TerminationCause terminationCause);
- /**
- * Updates relevant metrics when commit receives a response with a non-retryable command error
- * per the retryable writes specification.
- */
- void _onNonRetryableCommitError(OperationContext* opCtx, Status commitStatus);
+ /**
+ * Returns all participants created during the current statement.
+ */
+ std::vector<ShardId> _getPendingParticipants() const;
+
+ /**
+ * Prints slow transaction information to the log.
+ */
+ void _logSlowTransaction(OperationContext* opCtx, TerminationCause terminationCause) const;
+
+ /**
+ * Returns a string to be logged for slow transactions.
+ */
+ std::string _transactionInfoForLog(OperationContext* opCtx,
+ TerminationCause terminationCause) const;
+
+ // Shortcut to obtain the id of the session under which this transaction router runs
+ const LogicalSessionId& _sessionId() const;
+
+ TransactionRouter::PrivateState& p() {
+ return _tr->_p;
+ }
+ const TransactionRouter::PrivateState& p() const {
+ return _tr->_p;
+ }
+ TransactionRouter::ObservableState& o(WithLock) {
+ return _tr->_o;
+ }
+ using Observer::o;
+ }; // class Router
+
+ static Router get(OperationContext* opCtx) {
+ return Router(opCtx);
+ }
+
+ static Observer get(const ObservableSession& osession) {
+ return Observer(osession);
+ }
+
+private:
/**
- * The first time this method is called it marks the transaction as over in the router's
- * diagnostics and will log transaction information if its duration is over the global slowMS
- * threshold or the transaction log componenet verbosity >= 1. Only meant to be called when the
- * router definitively knows the transaction's outcome, e.g. it should not be invoked after a
- * network error on commit.
+ * State in this struct may be read by methods of Observer or Router, and may be written by
+ * methods of Router when they acquire the lock on the opCtx's Client. Access this inside
+ * Observer and Router using the private o() method for reading and (Router only) the
+ * o(WithLock) method for writing.
*/
- void _endTransactionTrackingIfNecessary(OperationContext* opCtx,
- TerminationCause terminationCause);
-
- // The currently active transaction number on this router, if beginOrContinueTxn has been
- // called. Otherwise set to kUninitializedTxnNumber.
- TxnNumber _txnNumber{kUninitializedTxnNumber};
+ struct ObservableState {
+ // The currently active transaction number on this router, if beginOrContinueTxn has been
+ // called. Otherwise set to kUninitializedTxnNumber.
+ TxnNumber txnNumber{kUninitializedTxnNumber};
- // Is updated at commit time to reflect which commit path was taken.
- CommitType _commitType{CommitType::kNotInitiated};
+ // Is updated at commit time to reflect which commit path was taken.
+ CommitType commitType{CommitType::kNotInitiated};
- // Indicates whether this is trying to recover a commitTransaction on the current transaction.
- bool _isRecoveringCommit{false};
+ // Map of current participants of the current transaction.
+ StringMap<Participant> participants;
- // Map of current participants of the current transaction.
- StringMap<Participant> _participants;
+ // The id of participant chosen as the two-phase commit coordinator. If, at commit time,
+ // two-phase commit is required, the participant list is handed off to this shard. Is unset
+ // until the transaction has targeted a participant, and is set to the first participant
+ // targeted. Is reset if the first participant targeted returns a "needs retargeting" error.
+ boost::optional<ShardId> coordinatorId;
- // The id of participant chosen as the two-phase commit coordinator. If, at commit time,
- // two-phase commit is required, the participant list is handed off to this shard. Is unset
- // until the transaction has targeted a participant, and is set to the first participant
- // targeted. Is reset if the first participant targeted returns a "needs retargeting" error.
- boost::optional<ShardId> _coordinatorId;
+ // The read concern the current transaction was started with.
+ repl::ReadConcernArgs readConcernArgs;
- // The id of the recovery participant. Passed in the recoveryToken that is included on responses
- // to the client. Is unset until the transaction has done a write, and is set to the first
- // participant that reports having done a write. Is reset if that participant is removed from
- // the participant list because another participant targeted in the same statement returned a
- // "needs retargeting" error.
- boost::optional<ShardId> _recoveryShardId;
+ // The cluster time of the timestamp all participant shards in the current transaction with
+ // snapshot level read concern must read from. Only set for transactions running with
+ // snapshot level read concern.
+ boost::optional<AtClusterTime> atClusterTime;
- // The read concern the current transaction was started with.
- repl::ReadConcernArgs _readConcernArgs;
+ // String representing the reason a transaction aborted. Either the string name of the error
+ // code that led to an implicit abort or "abort" if the client sent abortTransaction.
+ std::string abortCause;
- // The cluster time of the timestamp all participant shards in the current transaction with
- // snapshot level read concern must read from. Only set for transactions running with snapshot
- // level read concern.
- boost::optional<AtClusterTime> _atClusterTime;
+ // Stats used for calculating durations for the active transaction.
+ TimingStats timingStats;
+ } _o;
- // The statement id of the latest received command for this transaction. For batch writes, this
- // will be the highest stmtId contained in the batch. Incremented by one if new commands do not
- // contain statement ids.
- StmtId _latestStmtId{kDefaultFirstStmtId};
+ /**
+ * State in this struct may be read and written by methods of the Router, only. It may
+ * access the struct via the private p() accessor. No further locking is required in methods of
+ * the Router.
+ */
+ struct PrivateState {
+ // Indicates whether this is trying to recover a commitTransaction on the current
+ // transaction.
+ bool isRecoveringCommit{false};
- // The statement id of the command that began this transaction. Defaults to zero if no statement
- // id was included in the first command.
- StmtId _firstStmtId{kDefaultFirstStmtId};
+ // The id of the recovery participant. Passed in the recoveryToken that is included on
+ // responses to the client. Is unset until the transaction has done a write, and is set to
+ // the first participant that reports having done a write. Is reset if that participant is
+ // removed from the participant list because another participant targeted in the same
+ // statement returned a "needs retargeting" error.
+ boost::optional<ShardId> recoveryShardId;
- // String representing the reason a transaction aborted. Either the string name of the error
- // code that led to an implicit abort or "abort" if the client sent abortTransaction.
- std::string _abortCause;
+ // The statement id of the latest received command for this transaction. For batch writes,
+ // this will be the highest stmtId contained in the batch. Incremented by one if new
+ // commands do not contain statement ids.
+ StmtId latestStmtId{kDefaultFirstStmtId};
- // Stats used for calculating durations for the active transaction.
- TimingStats _timingStats;
+ // The statement id of the command that began this transaction. Defaults to zero if no
+ // statement id was included in the first command.
+ StmtId firstStmtId{kDefaultFirstStmtId};
- // Track whether commit or abort have been initiated.
- bool _terminationInitiated{false};
+ // Track whether commit or abort have been initiated.
+ bool terminationInitiated{false};
+ } _p;
};
} // namespace mongo
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 4b886f4a160..3fa8bb08416 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -222,7 +222,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
StartTxnShouldBeAttachedOnlyOnFirstStatementToParticipant) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -271,7 +271,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -320,7 +320,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime)
TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue),
@@ -331,7 +331,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting)
TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndReadConcern) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -416,7 +416,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe
TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -477,7 +477,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState)
TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -519,7 +519,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) {
TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForReadOnlyTransaction) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -532,12 +532,12 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForRea
ASSERT_FALSE(txnRouter.getRecoveryShardId());
// The recovery shard is not set if a participant responds with ok but that it is read-only.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
ASSERT_FALSE(txnRouter.getRecoveryShardId());
// The recovery shard is not set even if more read-only participants respond.
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter.processParticipantResponse(shard2, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse);
ASSERT_FALSE(txnRouter.getRecoveryShardId());
txnRouter.beginOrContinueTxn(
@@ -561,13 +561,13 @@ TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsSetToSingleParticipantIfSingleParticipantDoesWriteOnFirstStatement) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT(txnRouter.getRecoveryShardId());
ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1);
}
@@ -576,7 +576,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsSetToSingleParticipantIfSingleParticipantDoesWriteOnLaterStatement) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -584,11 +584,11 @@ TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
// Response to first statement says read-only.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
ASSERT_FALSE(txnRouter.getRecoveryShardId());
// Response to second statement says not read-only.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT(txnRouter.getRecoveryShardId());
ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1);
}
@@ -597,19 +597,19 @@ TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsSetToSecondParticipantIfSecondParticipantIsFirstToDoAWrite) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
// Shard1's response says read-only.
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
ASSERT_FALSE(txnRouter.getRecoveryShardId());
// Shard2's response says not read-only.
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
ASSERT(txnRouter.getRecoveryShardId());
ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard2);
}
@@ -618,14 +618,14 @@ TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsResetIfRecoveryParticipantIsPendingAndPendingParticipantsAreCleared) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
// Shard1's response says not read-only.
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT(txnRouter.getRecoveryShardId());
ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1);
@@ -642,14 +642,14 @@ TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsNotResetIfRecoveryParticipantIsNotPendingAndPendingParticipantsAreCleared) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
// Shard1's response says not read-only.
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT(txnRouter.getRecoveryShardId());
ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1);
@@ -661,7 +661,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// Shard2 responds, it doesn't matter whether it's read-only, just that it's a pending
// participant.
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
ASSERT(txnRouter.getRecoveryShardId());
ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1);
@@ -678,14 +678,14 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNewTransaction) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
// Shard1's response says not read-only.
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT(txnRouter.getRecoveryShardId());
ASSERT_EQ(*txnRouter.getRecoveryShardId(), shard1);
@@ -700,7 +700,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNe
TEST_F(TransactionRouterTestWithDefaultSession, DoesNotAttachTxnNumIfAlreadyThere) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -735,7 +735,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
"invariant") {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -751,7 +751,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfAlreadyOnCmd) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -786,7 +786,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfA
TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFirstStatement) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -803,7 +803,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughNoReadConcernToPart
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -834,7 +834,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -864,7 +864,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedReadConcernLeve
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(readConcernLevel);
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart),
@@ -879,7 +879,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
repl::ReadConcernArgs(LogicalTime(Timestamp(10, 1)), readConcernLevel);
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart),
@@ -894,7 +894,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
repl::ReadConcernArgs(repl::OpTime(Timestamp(10, 1), 2), readConcernLevel);
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart),
@@ -906,7 +906,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsOrRecoveryToken) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -946,7 +946,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
CommitTransactionWithNoParticipantsDoesNotSendCommit) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -967,13 +967,13 @@ TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyForSingleParticipantThatIsReadOnly) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1003,13 +1003,13 @@ TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyForSingleParticipantThatDidAWrite) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1039,15 +1039,15 @@ TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyForMultipleParticipantsThatAreAllReadOnly) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
- txnRouter.processParticipantResponse(shard2, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
@@ -1088,15 +1088,15 @@ TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyToReadOnlyShardsThenWriteShardForMultipleParticipantsOnlyOneDidAWrite) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
- txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
@@ -1139,15 +1139,15 @@ TEST_F(TransactionRouterTestWithDefaultSession,
SendCoordinateCommitForMultipleParticipantsMoreThanOneDidAWrite) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
- txnRouter.processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
@@ -1196,13 +1196,13 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
auto future =
- launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); });
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -1225,10 +1225,10 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) {
// Sending commit with a recovery token again should cause the router to use the recovery path
// again.
- txnRouter->beginOrContinueTxn(
+ txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
- future = launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); });
+ future = launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -1262,14 +1262,13 @@ TEST_F(TransactionRouterTestWithDefaultSession,
auto txnRouter = TransactionRouter::get(opCtx);
// Simulate recovering a commit with a recovery token and no participants.
{
- txnRouter->beginOrContinueTxn(
- opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
auto future =
- launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); });
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -1294,23 +1293,23 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// should be sent with the correct participant list.
{
++txnNum;
- txnRouter->beginOrContinueTxn(
+ txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
- txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
- txnRouter->beginOrContinueTxn(
+ txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
auto future =
- launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); });
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -1352,23 +1351,23 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// Run a cross-shard transaction with two-phase commit. The commit should be sent with the
// correct participant list.
{
- txnRouter->beginOrContinueTxn(
+ txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
- txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
- txnRouter->beginOrContinueTxn(
+ txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kCommit);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
auto future =
- launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); });
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -1400,14 +1399,13 @@ TEST_F(TransactionRouterTestWithDefaultSession,
{
++txnNum;
- txnRouter->beginOrContinueTxn(
- opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(shard1);
auto future =
- launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); });
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
onCommand([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -1442,10 +1440,10 @@ TEST_F(TransactionRouterTest, CommitWithEmptyRecoveryToken) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
TxnRecoveryToken recoveryToken;
- ASSERT_THROWS_CODE(txnRouter->commitTransaction(operationContext(), recoveryToken),
+ ASSERT_THROWS_CODE(txnRouter.commitTransaction(operationContext(), recoveryToken),
AssertionException,
ErrorCodes::NoSuchTransaction);
}
@@ -1463,13 +1461,13 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
TxnRecoveryToken recoveryToken;
recoveryToken.setRecoveryShardId(ShardId("magicShard"));
auto future =
- launchAsync([&] { txnRouter->commitTransaction(operationContext(), recoveryToken); });
+ launchAsync([&] { txnRouter.commitTransaction(operationContext(), recoveryToken); });
ShardType shardType;
shardType.setName(shard1.toString());
@@ -1484,7 +1482,7 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) {
TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsResetAtClusterTime) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1534,7 +1532,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
CannotChangeAtClusterTimeAfterStatementThatSelectedIt) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1597,7 +1595,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipants) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1647,7 +1645,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipa
TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAfterFirstCommand) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1669,7 +1667,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAft
TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreatedAt) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1723,7 +1721,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
AllParticipantsAndCoordinatorClearedOnStaleErrorOnFirstCommand) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1768,7 +1766,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClearedOnStaleError) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1812,7 +1810,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
RetriesCannotPickNewAtClusterTimeOnStatementAfterSelected) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
@@ -1866,7 +1864,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, WritesCanOnlyBeRetriedIfFirstOve
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1904,11 +1902,11 @@ TEST_F(TransactionRouterTest, AbortThrowsIfNoParticipants) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
ASSERT_THROWS_CODE(
- txnRouter->abortTransaction(opCtx), DBException, ErrorCodes::NoSuchTransaction);
+ txnRouter.abortTransaction(opCtx), DBException, ErrorCodes::NoSuchTransaction);
}
TEST_F(TransactionRouterTest, AbortForSingleParticipant) {
@@ -1922,11 +1920,11 @@ TEST_F(TransactionRouterTest, AbortForSingleParticipant) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); });
+ auto future = launchAsync([&] { return txnRouter.abortTransaction(operationContext()); });
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -1955,14 +1953,14 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsAllReturnSuccess) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
- txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
- auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); });
+ auto future = launchAsync([&] { return txnRouter.abortTransaction(operationContext()); });
std::map<HostAndPort, boost::optional<bool>> targets = {{hostAndPort1, true},
{hostAndPort2, {}}};
@@ -1998,16 +1996,16 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNoSuchTransa
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard3, {});
- txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
- txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
- txnRouter->processParticipantResponse(shard3, kOkReadOnlyFalseResponse);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {});
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard3, kOkReadOnlyFalseResponse);
- auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); });
+ auto future = launchAsync([&] { return txnRouter.abortTransaction(operationContext()); });
std::map<HostAndPort, boost::optional<bool>> targets = {
{hostAndPort1, true}, {hostAndPort2, {}}, {hostAndPort3, {}}};
@@ -2047,16 +2045,16 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard3, {});
- txnRouter->processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
- txnRouter->processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
- txnRouter->processParticipantResponse(shard3, kOkReadOnlyFalseResponse);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard3, {});
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard3, kOkReadOnlyFalseResponse);
- auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); });
+ auto future = launchAsync([&] { return txnRouter.abortTransaction(operationContext()); });
std::map<HostAndPort, boost::optional<bool>> targets = {
{hostAndPort1, true}, {hostAndPort2, {}}, {hostAndPort3, {}}};
@@ -2092,7 +2090,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError
TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNewParticipants) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2151,11 +2149,11 @@ TEST_F(TransactionRouterTest, ImplicitAbortIsNoopWithNoParticipants) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
// Should not throw.
- txnRouter->implicitlyAbortTransaction(opCtx, kDummyStatus);
+ txnRouter.implicitlyAbortTransaction(opCtx, kDummyStatus);
}
TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) {
@@ -2169,12 +2167,12 @@ TEST_F(TransactionRouterTest, ImplicitAbortForSingleParticipant) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
auto future = launchAsync(
- [&] { return txnRouter->implicitlyAbortTransaction(operationContext(), kDummyStatus); });
+ [&] { return txnRouter.implicitlyAbortTransaction(operationContext(), kDummyStatus); });
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -2202,13 +2200,13 @@ TEST_F(TransactionRouterTest, ImplicitAbortForMultipleParticipants) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard2, {});
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
auto future = launchAsync(
- [&] { return txnRouter->implicitlyAbortTransaction(operationContext(), kDummyStatus); });
+ [&] { return txnRouter.implicitlyAbortTransaction(operationContext(), kDummyStatus); });
std::map<HostAndPort, boost::optional<bool>> targets = {{hostAndPort1, true},
{hostAndPort2, {}}};
@@ -2243,12 +2241,12 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) {
RouterOperationContextSession scopedSession(opCtx);
auto txnRouter = TransactionRouter::get(opCtx);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
- txnRouter->attachTxnFieldsIfNeeded(operationContext(), shard1, {});
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.setDefaultAtClusterTime(operationContext());
+ txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
auto future = launchAsync(
- [&] { return txnRouter->implicitlyAbortTransaction(operationContext(), kDummyStatus); });
+ [&] { return txnRouter.implicitlyAbortTransaction(operationContext(), kDummyStatus); });
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
ASSERT_EQ(hostAndPort1, request.target);
@@ -2274,12 +2272,12 @@ TEST_F(TransactionRouterTestWithDefaultSession, AbortPropagatesWriteConcern) {
WriteConcernOptions writeConcern(10, WriteConcernOptions::SyncMode::NONE, 0);
opCtx->setWriteConcern(writeConcern);
- txnRouter->beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(opCtx);
- txnRouter->attachTxnFieldsIfNeeded(opCtx, shard1, {});
+ txnRouter.setDefaultAtClusterTime(opCtx);
+ txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {});
- auto future = launchAsync([&] { return txnRouter->abortTransaction(operationContext()); });
+ auto future = launchAsync([&] { return txnRouter.abortTransaction(operationContext()); });
onCommandForPoolExecutor([&](const RemoteCommandRequest& request) {
auto cmdName = request.cmdObj.firstElement().fieldNameStringData();
@@ -2297,7 +2295,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
CannotContinueOnSnapshotOrStaleVersionErrorsWithoutFailpoint) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2318,7 +2316,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, ContinuingTransactionPlacesItsReadConcernOnOpCtx) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2335,7 +2333,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
SubsequentStatementCanSelectAtClusterTimeIfNotSelectedYet) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
@@ -2383,16 +2381,16 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernHasNoAtClu
for (auto rcIt : supportedNonSnapshotRCLevels) {
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second);
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart);
// No atClusterTime is placed on the router by default.
- ASSERT_FALSE(txnRouter.getAtClusterTime());
+ ASSERT_FALSE(txnRouter.mustUseAtClusterTime());
// Can't compute and set an atClusterTime.
txnRouter.setDefaultAtClusterTime(operationContext());
- ASSERT_FALSE(txnRouter.getAtClusterTime());
+ ASSERT_FALSE(txnRouter.mustUseAtClusterTime());
// Can't continue on snapshot errors.
ASSERT_FALSE(txnRouter.canContinueOnSnapshotError());
@@ -2405,7 +2403,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
for (auto rcIt : supportedNonSnapshotRCLevels) {
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(rcIt.second);
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2442,7 +2440,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
repl::ReadConcernArgs::get(operationContext()) =
repl::ReadConcernArgs(clusterTime, rcIt.second);
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2463,7 +2461,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernLevelsPres
for (auto rcIt : supportedNonSnapshotRCLevels) {
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(opTime, rcIt.second);
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum++, TransactionRouter::TransactionActions::kStart);
@@ -2483,7 +2481,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
AbortBetweenStatementRetriesIgnoresNoSuchTransaction) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
@@ -2513,7 +2511,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
AbortBetweenStatementRetriesUsesIdempotentRetryPolicy) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
@@ -2545,7 +2543,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
AbortBetweenStatementRetriesFailsWithNoSuchTransactionOnUnexpectedErrors) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
@@ -2577,7 +2575,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
"Participant should exist if processing participant response") {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2587,20 +2585,20 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard2, {});
// Simulate response from some participant not in the list.
- txnRouter.processParticipantResponse(shard3, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard3, kOkReadOnlyTrueResponse);
}
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseDoesNotUpdateParticipantIfResponseStatusIsNotOk) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, BSON("ok" << 0));
+ txnRouter.processParticipantResponse(operationContext(), shard1, BSON("ok" << 0));
ASSERT(TransactionRouter::Participant::ReadOnly::kUnset ==
txnRouter.getParticipant(shard1)->readOnly);
}
@@ -2609,13 +2607,13 @@ TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseMarksParticipantAsReadOnlyIfResponseSaysReadOnlyTrue) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
const auto participant = txnRouter.getParticipant(shard1);
@@ -2623,10 +2621,10 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// Further responses with readOnly: true do not change the participant's readOnly field.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
ASSERT(TransactionRouter::Participant::ReadOnly::kReadOnly == participant->readOnly);
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
ASSERT(TransactionRouter::Participant::ReadOnly::kReadOnly == participant->readOnly);
}
@@ -2634,23 +2632,23 @@ TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseMarksParticipantAsNotReadOnlyIfFirstResponseSaysReadOnlyFalse) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
const auto participant = txnRouter.getParticipant(shard1);
ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly);
// Further responses with readOnly: false do not change the participant's readOnly field.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly);
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly);
}
@@ -2659,7 +2657,7 @@ TEST_F(
ProcessParticipantResponseUpdatesParticipantToReadOnlyFalseIfLaterResponseSaysReadOnlyFalse) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2667,14 +2665,14 @@ TEST_F(
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
// First response says readOnly: true.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
const auto participant = txnRouter.getParticipant(shard1);
ASSERT(TransactionRouter::Participant::ReadOnly::kReadOnly == participant->readOnly);
// Later response says readOnly: false.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly);
}
@@ -2683,7 +2681,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseThrowsIfParticipantClaimsToChangeFromReadOnlyFalseToReadOnlyTrue) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2691,23 +2689,24 @@ TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
// First response says readOnly: false.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
const auto participant = txnRouter.getParticipant(shard1);
ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly == participant->readOnly);
// Later response says readOnly: true.
- ASSERT_THROWS_CODE(txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse),
- AssertionException,
- 51113);
+ ASSERT_THROWS_CODE(
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse),
+ AssertionException,
+ 51113);
}
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseThrowsIfParticipantReturnsErrorThenSuccessOnLaterStatement) {
TxnNumber txnNum{3};
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2715,7 +2714,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.attachTxnFieldsIfNeeded(operationContext(), shard1, {});
// First response is an error.
- txnRouter.processParticipantResponse(shard1, BSON("ok" << 0));
+ txnRouter.processParticipantResponse(operationContext(), shard1, BSON("ok" << 0));
const auto participant = txnRouter.getParticipant(shard1);
ASSERT(TransactionRouter::Participant::ReadOnly::kUnset == participant->readOnly);
@@ -2730,12 +2729,14 @@ TEST_F(TransactionRouterTestWithDefaultSession,
operationContext(), txnNum, TransactionRouter::TransactionActions::kContinue);
// The router should throw regardless of whether the response says readOnly true or false.
- ASSERT_THROWS_CODE(txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse),
- AssertionException,
- 51112);
- ASSERT_THROWS_CODE(txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse),
- AssertionException,
- 51112);
+ ASSERT_THROWS_CODE(
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse),
+ AssertionException,
+ 51112);
+ ASSERT_THROWS_CODE(
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse),
+ AssertionException,
+ 51112);
}
TEST_F(TransactionRouterTestWithDefaultSession,
@@ -2743,7 +2744,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto opCtx = operationContext();
- auto& txnRouter(*TransactionRouter::get(opCtx));
+ auto txnRouter = TransactionRouter::get(opCtx);
txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(opCtx);
@@ -2759,7 +2760,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
future.default_timed_get();
// The participant's response metadata should not be processed since abort has been initiated.
- txnRouter.processParticipantResponse(shard1, BSON("ok" << 0));
+ txnRouter.processParticipantResponse(operationContext(), shard1, BSON("ok" << 0));
ASSERT(TransactionRouter::Participant::ReadOnly::kUnset ==
txnRouter.getParticipant(shard1)->readOnly);
}
@@ -2769,7 +2770,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto opCtx = operationContext();
- auto& txnRouter(*TransactionRouter::get(opCtx));
+ auto txnRouter = TransactionRouter::get(opCtx);
txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(opCtx);
@@ -2781,7 +2782,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
future.default_timed_get();
// The participant's response metadata should not be processed since abort has been initiated.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
ASSERT(TransactionRouter::Participant::ReadOnly::kUnset ==
txnRouter.getParticipant(shard1)->readOnly);
}
@@ -2791,14 +2792,14 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TxnNumber txnNum{3};
auto opCtx = operationContext();
- auto& txnRouter(*TransactionRouter::get(opCtx));
+ auto txnRouter = TransactionRouter::get(opCtx);
txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(opCtx);
txnRouter.attachTxnFieldsIfNeeded(opCtx, shard1, {});
// Process !readonly response to set participant state.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT(TransactionRouter::Participant::ReadOnly::kNotReadOnly ==
txnRouter.getParticipant(shard1)->readOnly);
@@ -2811,7 +2812,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
future.default_timed_get();
// Processing readonly response should not throw since commit has been initiated.
- txnRouter.processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
}
// Begins a transaction with snapshot level read concern and sets a default cluster time.
@@ -2827,7 +2828,7 @@ protected:
void setUp() override {
TransactionRouterTestWithDefaultSession::setUp();
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -2835,7 +2836,7 @@ protected:
};
TEST_F(TransactionRouterTestWithDefaultSessionAndStartedSnapshot, AddAtClusterTimeNormal) {
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
shard1,
BSON("aggregate"
@@ -2851,7 +2852,7 @@ TEST_F(TransactionRouterTestWithDefaultSessionAndStartedSnapshot,
AddingAtClusterTimeOverwritesExistingAfterClusterTime) {
const Timestamp existingAfterClusterTime(1, 1);
- auto& txnRouter(*TransactionRouter::get(operationContext()));
+ auto txnRouter = TransactionRouter::get(operationContext());
auto newCmd = txnRouter.attachTxnFieldsIfNeeded(operationContext(),
shard1,
BSON("aggregate"
@@ -2884,8 +2885,8 @@ protected:
return dynamic_cast<TickSourceMock<Microseconds>*>(getServiceContext()->getTickSource());
}
- TransactionRouter& txnRouter() {
- return *TransactionRouter::get(operationContext());
+ TransactionRouter::Router txnRouter() {
+ return TransactionRouter::get(operationContext());
}
void beginTxnWithDefaultTxnNumber() {
@@ -2936,7 +2937,8 @@ protected:
void explicitAbortInProgress() {
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(
+ operationContext(), shard1, kOkReadOnlyFalseResponse);
startCapturingLogMessages();
auto future = launchAsync([&] { txnRouter().abortTransaction(operationContext()); });
@@ -2947,7 +2949,8 @@ protected:
void implicitAbortInProgress() {
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(
+ operationContext(), shard1, kOkReadOnlyFalseResponse);
startCapturingLogMessages();
auto future = launchAsync(
@@ -2959,7 +2962,8 @@ protected:
void runCommit(StatusWith<BSONObj> swRes, bool expectRetries = false) {
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(
+ operationContext(), shard1, kOkReadOnlyFalseResponse);
startCapturingLogMessages();
auto future = launchAsync([&] {
@@ -3014,7 +3018,7 @@ protected:
void runSingleShardCommit() {
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
startCapturingLogMessages();
auto future = launchAsync(
@@ -3026,9 +3030,9 @@ protected:
void runReadOnlyCommit() {
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter().processParticipantResponse(shard2, kOkReadOnlyTrueResponse);
+ txnRouter().processParticipantResponse(operationContext(), shard2, kOkReadOnlyTrueResponse);
startCapturingLogMessages();
auto future = launchAsync(
@@ -3041,9 +3045,10 @@ protected:
void runSingleWriteShardCommit() {
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyTrueResponse);
+ txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter().processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(
+ operationContext(), shard2, kOkReadOnlyFalseResponse);
startCapturingLogMessages();
auto future = launchAsync(
@@ -3056,9 +3061,11 @@ protected:
void runTwoPhaseCommit() {
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(
+ operationContext(), shard1, kOkReadOnlyFalseResponse);
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter().processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(
+ operationContext(), shard2, kOkReadOnlyFalseResponse);
startCapturingLogMessages();
auto future = launchAsync(
@@ -3096,7 +3103,8 @@ protected:
auto beginAndPauseCommit() {
// Commit after targeting one shard so the commit has to do work and can be paused.
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(
+ operationContext(), shard1, kOkReadOnlyFalseResponse);
auto future = launchAsync(
[&] { txnRouter().commitTransaction(operationContext(), kDummyRecoveryToken); });
@@ -3880,15 +3888,15 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalRequestsTargeted) {
// Increases each time transaction fields are attached.
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalRequestsTargeted());
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalRequestsTargeted());
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter().processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
ASSERT_EQUALS(3L, routerTxnMetrics()->getTotalRequestsTargeted());
}
@@ -3928,11 +3936,11 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalParticipantsAtCommit) {
ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalParticipantsAtCommit());
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
- txnRouter().processParticipantResponse(shard1, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(operationContext(), shard1, kOkReadOnlyFalseResponse);
ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalParticipantsAtCommit());
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard2, {});
- txnRouter().processParticipantResponse(shard2, kOkReadOnlyFalseResponse);
+ txnRouter().processParticipantResponse(operationContext(), shard2, kOkReadOnlyFalseResponse);
ASSERT_EQUALS(0L, routerTxnMetrics()->getTotalParticipantsAtCommit());
// Increases after commit begins, before it ends.
diff --git a/src/mongo/s/write_ops/batch_write_exec_test.cpp b/src/mongo/s/write_ops/batch_write_exec_test.cpp
index d0c2434c13c..b171341c64c 100644
--- a/src/mongo/s/write_ops/batch_write_exec_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_exec_test.cpp
@@ -640,9 +640,9 @@ public:
_scopedSession.emplace(operationContext());
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter->beginOrContinueTxn(
+ txnRouter.beginOrContinueTxn(
operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
- txnRouter->setDefaultAtClusterTime(operationContext());
+ txnRouter.setDefaultAtClusterTime(operationContext());
}
void tearDown() override {
diff --git a/src/mongo/s/write_ops/batch_write_op.cpp b/src/mongo/s/write_ops/batch_write_op.cpp
index 5efce61128d..cc1d18e2f30 100644
--- a/src/mongo/s/write_ops/batch_write_op.cpp
+++ b/src/mongo/s/write_ops/batch_write_op.cpp
@@ -248,7 +248,7 @@ BatchWriteOp::BatchWriteOp(OperationContext* opCtx, const BatchedCommandRequest&
: _opCtx(opCtx),
_clientRequest(clientRequest),
_batchTxnNum(_opCtx->getTxnNumber()),
- _inTransaction(TransactionRouter::get(opCtx) != nullptr) {
+ _inTransaction(bool(TransactionRouter::get(opCtx))) {
_writeOps.reserve(_clientRequest.sizeWriteOps());
for (size_t i = 0; i < _clientRequest.sizeWriteOps(); ++i) {
diff --git a/src/mongo/s/write_ops/batch_write_op_test.cpp b/src/mongo/s/write_ops/batch_write_op_test.cpp
index 49b586fc58a..38c874eee90 100644
--- a/src/mongo/s/write_ops/batch_write_op_test.cpp
+++ b/src/mongo/s/write_ops/batch_write_op_test.cpp
@@ -1562,7 +1562,7 @@ public:
_scopedSession.emplace(operationContext());
auto txnRouter = TransactionRouter::get(operationContext());
- txnRouter->beginOrContinueTxn(
+ txnRouter.beginOrContinueTxn(
operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
}
diff --git a/src/mongo/s/write_ops/write_op.cpp b/src/mongo/s/write_ops/write_op.cpp
index 97e5b1d1795..fad332f928c 100644
--- a/src/mongo/s/write_ops/write_op.cpp
+++ b/src/mongo/s/write_ops/write_op.cpp
@@ -76,7 +76,7 @@ Status WriteOp::targetWrites(OperationContext* opCtx,
//
// NOTE: Index inserts are currently specially targeted only at the current collection to avoid
// creating collections everywhere.
- const bool inTransaction = TransactionRouter::get(opCtx) != nullptr;
+ const bool inTransaction = bool(TransactionRouter::get(opCtx));
if (swEndpoints.isOK() && swEndpoints.getValue().size() > 1u && !inTransaction) {
swEndpoints = targeter.targetAllShards(opCtx);
}
diff --git a/src/mongo/s/write_ops/write_op_test.cpp b/src/mongo/s/write_ops/write_op_test.cpp
index 3110a74c909..4911ee13f97 100644
--- a/src/mongo/s/write_ops/write_op_test.cpp
+++ b/src/mongo/s/write_ops/write_op_test.cpp
@@ -337,7 +337,7 @@ TEST_F(WriteOpTransactionTests, TargetMultiAllShardsAndErrorSingleChildOp) {
opCtx()->setTxnNumber(kTxnNumber);
auto txnRouter = TransactionRouter::get(opCtx());
- txnRouter->beginOrContinueTxn(
+ txnRouter.beginOrContinueTxn(
opCtx(), kTxnNumber, TransactionRouter::TransactionActions::kStart);
// Do multi-target write op