summaryrefslogtreecommitdiff
path: root/src/mongo/s
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-03-21 03:20:43 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-03-30 19:07:32 +0000
commit8775f61d11bbb49f191e6e1e2713b8a6b5aef73a (patch)
tree1f6e22dedf273fa8822d28b772b2b2c84f7e587c /src/mongo/s
parent2ab888c8fe45f88ce9509a3690e083ad5f35e418 (diff)
downloadmongo-8775f61d11bbb49f191e6e1e2713b8a6b5aef73a.tar.gz
SERVER-63495 Unyielding TransactionRouter never fails except at shutdown and support yielding in cluster commands
Diffstat (limited to 'src/mongo/s')
-rw-r--r--src/mongo/s/async_requests_sender.cpp14
-rw-r--r--src/mongo/s/async_requests_sender_test.cpp4
-rw-r--r--src/mongo/s/commands/cluster_find_and_modify_cmd.cpp15
-rw-r--r--src/mongo/s/commands/cluster_write_cmd.cpp8
-rw-r--r--src/mongo/s/commands/strategy.cpp21
-rw-r--r--src/mongo/s/multi_statement_transaction_requests_sender.cpp16
-rw-r--r--src/mongo/s/query/router_stage_merge.h6
-rw-r--r--src/mongo/s/session_catalog_router.cpp14
-rw-r--r--src/mongo/s/session_catalog_router.h2
-rw-r--r--src/mongo/s/transaction_router.cpp63
-rw-r--r--src/mongo/s/transaction_router.h22
-rw-r--r--src/mongo/s/transaction_router_resource_yielder.cpp33
-rw-r--r--src/mongo/s/transaction_router_resource_yielder.h20
-rw-r--r--src/mongo/s/transaction_router_test.cpp246
14 files changed, 389 insertions, 95 deletions
diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp
index aae48dbb1ab..1fceefc428e 100644
--- a/src/mongo/s/async_requests_sender.cpp
+++ b/src/mongo/s/async_requests_sender.cpp
@@ -117,13 +117,15 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() noexcept {
_resourceYielder->yield(_opCtx);
}
- // Only wait for the next result without popping it, so an error unyielding doesn't discard
- // an already popped response.
- _responseQueue.waitForNonEmpty(_opCtx);
+ // Only wait for the next result without popping it, so an error unyielding doesn't
+ // discard an already popped response.
+ auto waitStatus = _responseQueue.waitForNonEmptyNoThrow(_opCtx);
- if (_resourceYielder) {
- _resourceYielder->unyield(_opCtx);
- }
+ auto unyieldStatus =
+ _resourceYielder ? _resourceYielder->unyieldNoThrow(_opCtx) : Status::OK();
+
+ uassertStatusOK(waitStatus);
+ uassertStatusOK(unyieldStatus);
// There should always be a response ready after the wait above.
auto response = _responseQueue.tryPop();
diff --git a/src/mongo/s/async_requests_sender_test.cpp b/src/mongo/s/async_requests_sender_test.cpp
index 28be4bfa668..39874e7e898 100644
--- a/src/mongo/s/async_requests_sender_test.cpp
+++ b/src/mongo/s/async_requests_sender_test.cpp
@@ -214,7 +214,7 @@ TEST_F(AsyncRequestsSenderTest, HandlesExceptionWhenUnyielding) {
future.default_timed_get();
}
-TEST_F(AsyncRequestsSenderTest, ExceptionWhileWaitingSkipsUnyield) {
+TEST_F(AsyncRequestsSenderTest, ExceptionWhileWaitingDoesNotSkipUnyield) {
class CountingResourceYielder : public ResourceYielder {
public:
void yield(OperationContext*) {
@@ -258,7 +258,7 @@ TEST_F(AsyncRequestsSenderTest, ExceptionWhileWaitingSkipsUnyield) {
future.default_timed_get();
ASSERT_EQ(yielderPointer->timesYielded, 1);
- ASSERT_EQ(yielderPointer->timesUnyielded, 0);
+ ASSERT_EQ(yielderPointer->timesUnyielded, 1);
}
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
index 5931b4b1d73..501a996c84b 100644
--- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
+++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp
@@ -148,10 +148,8 @@ void handleWouldChangeOwningShardErrorRetryableWrite(
const NamespaceString& nss,
const write_ops::FindAndModifyCommandRequest& request,
BSONObjBuilder* result) {
- auto txn =
- txn_api::TransactionWithRetries(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
- TransactionRouterResourceYielder::make());
+ auto txn = txn_api::TransactionWithRetries(
+ opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), nullptr);
// Shared state for the transaction API use below.
struct SharedBlock {
@@ -245,10 +243,11 @@ void handleWouldChangeOwningShardErrorTransaction(
WouldChangeOwningShardInfo::parseFromCommandError(extraInfo), nss);
try {
- auto txn =
- txn_api::TransactionWithRetries(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
- TransactionRouterResourceYielder::make());
+ auto txn = txn_api::TransactionWithRetries(
+ opCtx,
+ Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
+ TransactionRouterResourceYielder::makeForLocalHandoff());
+
txn.runSync(opCtx,
[sharedBlock](const txn_api::TransactionClient& txnClient,
diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp
index e7fb4233e00..af64f184f90 100644
--- a/src/mongo/s/commands/cluster_write_cmd.cpp
+++ b/src/mongo/s/commands/cluster_write_cmd.cpp
@@ -156,10 +156,8 @@ void handleWouldChangeOwningShardErrorRetryableWrite(OperationContext* opCtx,
// Unset error details because they will be repopulated below.
response->unsetErrDetails();
- auto txn =
- txn_api::TransactionWithRetries(opCtx,
- Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
- TransactionRouterResourceYielder::make());
+ auto txn = txn_api::TransactionWithRetries(
+ opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), nullptr);
// Shared state for the transaction API use below.
struct SharedBlock {
@@ -238,7 +236,7 @@ UpdateShardKeyResult handleWouldChangeOwningShardErrorTransaction(
auto txn =
txn_api::TransactionWithRetries(opCtx,
Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(),
- TransactionRouterResourceYielder::make());
+ TransactionRouterResourceYielder::makeForLocalHandoff());
try {
txn.runSync(opCtx,
diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp
index 379d0f5365d..edefcf546fe 100644
--- a/src/mongo/s/commands/strategy.cpp
+++ b/src/mongo/s/commands/strategy.cpp
@@ -199,13 +199,10 @@ Future<void> invokeInTransactionRouter(std::shared_ptr<RequestExecutionContext>
auto opCtx = rec->getOpCtx();
- auto txnRouter = TransactionRouter::get(opCtx);
- if (!txnRouter) {
- // The command had yielded its session while the error was thrown.
- return;
+ // Abort if the router wasn't yielded, which may happen at global shutdown.
+ if (auto txnRouter = TransactionRouter::get(opCtx)) {
+ txnRouter.implicitlyAbortTransaction(opCtx, status);
}
-
- TransactionRouter::get(opCtx).implicitlyAbortTransaction(opCtx, status);
});
}
@@ -443,8 +440,6 @@ private:
// Logs and updates statistics if an error occurs.
void _tapOnError(const Status& status);
- void _tapOnSuccess();
-
ParseAndRunCommand* const _parc;
boost::optional<RouterOperationContextSession> _routerSession;
@@ -1085,15 +1080,6 @@ void ParseAndRunCommand::RunInvocation::_tapOnError(const Status& status) {
_parc->_errorBuilder->appendElements(errorLabels);
}
-void ParseAndRunCommand::RunInvocation::_tapOnSuccess() {
- auto opCtx = _parc->_rec->getOpCtx();
- if (_parc->_osi && _parc->_osi->getAutocommit() == boost::optional<bool>(false)) {
- tassert(5918604,
- "A successful transaction command must always check out its session after yielding",
- TransactionRouter::get(opCtx));
- }
-}
-
Future<void> ParseAndRunCommand::RunAndRetry::run() {
return makeReadyFutureWith([&] {
// Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown.
@@ -1134,7 +1120,6 @@ Future<void> ParseAndRunCommand::RunInvocation::run() {
return future_util::makeState<RunAndRetry>(_parc).thenWithState(
[](auto* runner) { return runner->run(); });
})
- .tap([this]() { _tapOnSuccess(); })
.tapError([this](Status status) { _tapOnError(status); });
}
diff --git a/src/mongo/s/multi_statement_transaction_requests_sender.cpp b/src/mongo/s/multi_statement_transaction_requests_sender.cpp
index d6f1341584c..c62b4e97da4 100644
--- a/src/mongo/s/multi_statement_transaction_requests_sender.cpp
+++ b/src/mongo/s/multi_statement_transaction_requests_sender.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/operation_context.h"
#include "mongo/s/transaction_router.h"
+#include "mongo/s/transaction_router_resource_yielder.h"
namespace mongo {
@@ -81,13 +82,14 @@ MultiStatementTransactionRequestsSender::MultiStatementTransactionRequestsSender
const ReadPreferenceSetting& readPreference,
Shard::RetryPolicy retryPolicy)
: _opCtx(opCtx),
- _ars(std::make_unique<AsyncRequestsSender>(opCtx,
- std::move(executor),
- dbName,
- attachTxnDetails(opCtx, requests),
- readPreference,
- retryPolicy,
- nullptr /* resourceYielder */)) {}
+ _ars(std::make_unique<AsyncRequestsSender>(
+ opCtx,
+ std::move(executor),
+ dbName,
+ attachTxnDetails(opCtx, requests),
+ readPreference,
+ retryPolicy,
+ TransactionRouterResourceYielder::makeForRemoteCommand())) {}
MultiStatementTransactionRequestsSender::~MultiStatementTransactionRequestsSender() {
invariant(_opCtx);
diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h
index 72f1aec26fb..c095f1b146a 100644
--- a/src/mongo/s/query/router_stage_merge.h
+++ b/src/mongo/s/query/router_stage_merge.h
@@ -35,6 +35,7 @@
#include "mongo/s/query/blocking_results_merger.h"
#include "mongo/s/query/cluster_client_cursor_params.h"
#include "mongo/s/query/router_exec_stage.h"
+#include "mongo/s/transaction_router_resource_yielder.h"
#include "mongo/util/net/hostandport.h"
namespace mongo {
@@ -49,7 +50,10 @@ public:
std::shared_ptr<executor::TaskExecutor> executor,
AsyncResultsMergerParams&& armParams)
: RouterExecStage(opCtx),
- _resultsMerger(opCtx, std::move(armParams), std::move(executor), nullptr) {}
+ _resultsMerger(opCtx,
+ std::move(armParams),
+ std::move(executor),
+ TransactionRouterResourceYielder::makeForRemoteCommand()) {}
StatusWith<ClusterQueryResult> next() final {
return _resultsMerger.next(getOpCtx());
diff --git a/src/mongo/s/session_catalog_router.cpp b/src/mongo/s/session_catalog_router.cpp
index 1207f371dbf..e8b9470ca17 100644
--- a/src/mongo/s/session_catalog_router.cpp
+++ b/src/mongo/s/session_catalog_router.cpp
@@ -73,16 +73,20 @@ RouterOperationContextSession::RouterOperationContextSession(OperationContext* o
RouterOperationContextSession::~RouterOperationContextSession() {
if (auto txnRouter = TransactionRouter::get(_opCtx)) {
- // Only stash if the session wasn't yielded.
- txnRouter.stash(_opCtx);
+ // Only stash if the session wasn't yielded. This should only happen at global shutdown.
+ txnRouter.stash(_opCtx, TransactionRouter::StashReason::kDone);
}
};
-void RouterOperationContextSession::checkIn(OperationContext* opCtx) {
+void RouterOperationContextSession::checkIn(OperationContext* opCtx,
+ OperationContextSession::CheckInReason reason) {
invariant(OperationContextSession::get(opCtx));
- TransactionRouter::get(opCtx).stash(opCtx);
- OperationContextSession::checkIn(opCtx);
+ TransactionRouter::get(opCtx).stash(opCtx,
+ reason == OperationContextSession::CheckInReason::kYield
+ ? TransactionRouter::StashReason::kYield
+ : TransactionRouter::StashReason::kDone);
+ OperationContextSession::checkIn(opCtx, reason);
}
void RouterOperationContextSession::checkOut(OperationContext* opCtx) {
diff --git a/src/mongo/s/session_catalog_router.h b/src/mongo/s/session_catalog_router.h
index 301b40fffca..ecb4771d3c7 100644
--- a/src/mongo/s/session_catalog_router.h
+++ b/src/mongo/s/session_catalog_router.h
@@ -68,7 +68,7 @@ public:
* Check-in may only be called if the session has actually been checked out previously and
* similarly check-out may only be called if the session is not checked out already.
*/
- static void checkIn(OperationContext* opCtx);
+ static void checkIn(OperationContext* opCtx, OperationContextSession::CheckInReason reason);
static void checkOut(OperationContext* opCtx);
private:
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index e2739a36778..10ecdb05442 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -57,6 +57,7 @@
#include "mongo/s/router_transactions_metrics.h"
#include "mongo/s/shard_cannot_refresh_due_to_locks_held_exception.h"
#include "mongo/util/assert_util.h"
+#include "mongo/util/exit.h"
#include "mongo/util/fail_point.h"
#include "mongo/util/log_with_sampling.h"
#include "mongo/util/net/socket_utils.h"
@@ -1040,13 +1041,18 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx,
_updateLastClientInfo(opCtx->getClient());
}
-void TransactionRouter::Router::stash(OperationContext* opCtx) {
+void TransactionRouter::Router::stash(OperationContext* opCtx, StashReason reason) {
if (!isInitialized()) {
return;
}
- auto tickSource = opCtx->getServiceContext()->getTickSource();
stdx::lock_guard<Client> lk(*opCtx->getClient());
+
+ if (reason == StashReason::kYield) {
+ ++o(lk).activeYields;
+ }
+
+ auto tickSource = opCtx->getServiceContext()->getTickSource();
o(lk).metricsTracker->trySetInactive(tickSource, tickSource->getTicks());
}
@@ -1055,11 +1061,21 @@ void TransactionRouter::Router::unstash(OperationContext* opCtx) {
return;
}
- // Validate that the transaction number hasn't changed.
- if (o().txnNumberAndRetryCounter.getTxnNumber() != opCtx->getTxnNumber()) {
- uasserted(ErrorCodes::NoSuchTransaction,
- "The requested operation has a different transaction number than the active "
- "transaction.");
+ // Validate that the transaction number hasn't changed while we were yielded. This is guaranteed
+ // by the activeYields check when beginning a new transaction.
+ invariant(opCtx->getTxnNumber(), "Cannot unstash without a transaction number");
+ invariant(o().txnNumberAndRetryCounter.getTxnNumber() == opCtx->getTxnNumber(),
+ str::stream()
+ << "The requested operation has a different transaction number than the active "
+ "transaction. Active: "
+ << o().txnNumberAndRetryCounter.getTxnNumber()
+ << ", operation: " << *opCtx->getTxnNumber());
+
+ {
+ stdx::lock_guard<Client> lg(*opCtx->getClient());
+ --o(lg).activeYields;
+ invariant(o(lg).activeYields >= 0,
+ str::stream() << "Invalid activeYields: " << o(lg).activeYields);
}
auto tickSource = opCtx->getServiceContext()->getTickSource();
@@ -1242,12 +1258,29 @@ BSONObj TransactionRouter::Router::_commitTransaction(
return _handOffCommitToCoordinator(opCtx);
}
+// Returns if the opCtx has yielded its session and failed to unyield it, which may happen during
+// methods that send network requests at global shutdown when running on a mongod.
+bool failedToUnyieldSessionAtShutdown(OperationContext* opCtx) {
+ if (!TransactionRouter::get(opCtx)) {
+ invariant(globalInShutdownDeprecated());
+ return true;
+ }
+ return false;
+}
+
BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) {
invariant(isInitialized());
// Update stats on scope exit so the transaction is considered "active" while waiting on abort
// responses.
- ScopeGuard updateStatsGuard([&] { _onExplicitAbort(opCtx); });
+ ScopeGuard updateStatsGuard([&] {
+ if (failedToUnyieldSessionAtShutdown(opCtx)) {
+ // It's unsafe to continue without the session checked out. This should only happen at
+ // global shutdown, so it's acceptable to skip updating stats.
+ return;
+ }
+ _onExplicitAbort(opCtx);
+ });
// The router has yet to send any commands to a remote shard for this transaction.
// Return the same error that would have been returned by a shard.
@@ -1325,7 +1358,14 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC
// Update stats on scope exit so the transaction is considered "active" while waiting on abort
// responses.
- ScopeGuard updateStatsGuard([&] { _onImplicitAbort(opCtx, status); });
+ ScopeGuard updateStatsGuard([&] {
+ if (failedToUnyieldSessionAtShutdown(opCtx)) {
+ // It's unsafe to continue without the session checked out. This should only happen at
+ // global shutdown, so it's acceptable to skip updating stats.
+ return;
+ }
+ _onImplicitAbort(opCtx, status);
+ });
if (o().participants.empty()) {
return;
@@ -1396,6 +1436,11 @@ void TransactionRouter::Router::appendRecoveryToken(BSONObjBuilder* builder) con
void TransactionRouter::Router::_resetRouterState(
OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) {
stdx::lock_guard<Client> lk(*opCtx->getClient());
+
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Cannot start a new transaction while the previous is yielded",
+ o(lk).activeYields == 0);
+
o(lk).txnNumberAndRetryCounter.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber());
o(lk).txnNumberAndRetryCounter.setTxnRetryCounter(
*txnNumberAndRetryCounter.getTxnRetryCounter());
diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h
index 276a473aead..a7b49c5eb12 100644
--- a/src/mongo/s/transaction_router.h
+++ b/src/mongo/s/transaction_router.h
@@ -71,6 +71,9 @@ public:
kRecoverWithToken,
};
+ // The reason why TransactionRouter::Router::stash() is called.
+ enum class StashReason { kDone, kYield };
+
// The default value to use as the statement id of the first command in the transaction if none
// was sent.
static const StmtId kDefaultFirstStmtId = 0;
@@ -326,6 +329,15 @@ public:
return o().txnNumberAndRetryCounter.getTxnNumber() != kUninitializedTxnNumber;
}
+ /**
+ * Returns if this TransactionRouter instance can be reaped. Always true unless an operation
+ * has yielded the router and has not unyielded yet. We cannot reap the instance in that
+ * case or the unyield would check out a different TransactionRouter than it yielded.
+ */
+ auto canBeReaped() const {
+ return o().activeYields == 0;
+ }
+
protected:
explicit Observer(TransactionRouter* tr) : _tr(tr) {}
@@ -372,9 +384,10 @@ public:
TransactionActions action);
/**
- * Updates transaction diagnostics when the transaction's session is checked in.
+ * Updates transaction diagnostics and, if necessary, the number of active yielders when the
+ * transaction's session is checked in.
*/
- void stash(OperationContext* opCtx);
+ void stash(OperationContext* opCtx, StashReason reason);
/**
* Validates transaction state is still compatible after a yield.
@@ -775,6 +788,11 @@ private:
// certain transaction events. Unset until the transaction router has processed at least one
// transaction command.
boost::optional<MetricsTracker> metricsTracker;
+
+ // How many operations that checked out the router's session have currently yielded it. The
+ // transaction number cannot be changed until this returns to 0, otherwise we cannot
+ // guarantee that unyielding the session cannot fail.
+ int32_t activeYields{0};
} _o;
/**
diff --git a/src/mongo/s/transaction_router_resource_yielder.cpp b/src/mongo/s/transaction_router_resource_yielder.cpp
index 252231483a4..6d5a5e0c2a4 100644
--- a/src/mongo/s/transaction_router_resource_yielder.cpp
+++ b/src/mongo/s/transaction_router_resource_yielder.cpp
@@ -30,25 +30,52 @@
#include "mongo/s/transaction_router_resource_yielder.h"
#include "mongo/db/session_catalog.h"
+#include "mongo/s/is_mongos.h"
#include "mongo/s/session_catalog_router.h"
+#include "mongo/util/exit.h"
namespace mongo {
-std::unique_ptr<TransactionRouterResourceYielder> TransactionRouterResourceYielder::make() {
+std::unique_ptr<TransactionRouterResourceYielder>
+TransactionRouterResourceYielder::makeForLocalHandoff() {
+ return std::make_unique<TransactionRouterResourceYielder>();
+}
+
+std::unique_ptr<TransactionRouterResourceYielder>
+TransactionRouterResourceYielder::makeForRemoteCommand() {
+ if (isMongos()) {
+ // Mongos cannot target itself so it does not need to yield for remote commands.
+ return nullptr;
+ }
return std::make_unique<TransactionRouterResourceYielder>();
}
void TransactionRouterResourceYielder::yield(OperationContext* opCtx) {
Session* const session = OperationContextSession::get(opCtx);
if (session) {
- RouterOperationContextSession::checkIn(opCtx);
+ RouterOperationContextSession::checkIn(opCtx,
+ OperationContextSession::CheckInReason::kYield);
}
_yielded = (session != nullptr);
}
void TransactionRouterResourceYielder::unyield(OperationContext* opCtx) {
if (_yielded) {
- RouterOperationContextSession::checkOut(opCtx);
+ // Code that uses the TransactionRouter assumes it will only run with it, so check back out
+ // the session ignoring interruptions, except at global shutdown to prevent stalling
+ // shutdown. Unyield should always run with no resources held, so there shouldn't be a risk
+ // of deadlock.
+ try {
+ opCtx->runWithoutInterruptionExceptAtGlobalShutdown(
+ [&] { RouterOperationContextSession::checkOut(opCtx); });
+ } catch (const DBException&) {
+ // This can throw at global shutdown, so calling code that catches errors may
+ // unexpectedly run without a session checked out. This is assumed safe because the
+ // process is shutting down and can't do any meaningful work. This invariant is to
+ // safeguard that assumption.
+ invariant(globalInShutdownDeprecated());
+ throw;
+ }
}
}
diff --git a/src/mongo/s/transaction_router_resource_yielder.h b/src/mongo/s/transaction_router_resource_yielder.h
index 173c588c73a..7334d76b178 100644
--- a/src/mongo/s/transaction_router_resource_yielder.h
+++ b/src/mongo/s/transaction_router_resource_yielder.h
@@ -41,9 +41,22 @@ namespace mongo {
class TransactionRouterResourceYielder : public ResourceYielder {
public:
/**
- * Returns a newly allocated yielder.
+ * The next two methods return a newly allocated yielder for the given yielding scenario.
+ *
+ * Use makeForLocalHandoff() when the operation expected to check out the session next is in the
+ * same local process, e.g. a mongos command in a transaction is yielding so a command spawned
+ * by the transaction API can check out the same session.
+ *
+ * Use makeForRemoteCommand() when yielding waiting for a possibly remote response to be
+ * received. If the local process serves both a router and mongod role, the local process may
+ * need to check out the same session to produce the response.
+ *
+ * The only difference in behavior is that makeForLocalHandoff() always returns a yielder and
+ * makeForRemoteCommand() returns nullptr if the local process is a mongos because a mongos
+ * cannot target itself.
*/
- static std::unique_ptr<TransactionRouterResourceYielder> make();
+ static std::unique_ptr<TransactionRouterResourceYielder> makeForLocalHandoff();
+ static std::unique_ptr<TransactionRouterResourceYielder> makeForRemoteCommand();
/**
* If the opCtx has a checked out RouterOperationContextSession, yields it.
@@ -52,7 +65,8 @@ public:
/**
* If the opCtx had previously checked out RouterOperationContextSession, checks it back out.
- * Note this may throw if the opCtx has been interrupted.
+ * Note this may throw if the opCtx is interrupted at global shutdown. Otherwise it should never
+ * fail.
*/
void unyield(OperationContext* opCtx) override;
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 89c3ad2675b..b132e05bdd7 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -233,6 +233,7 @@ private:
TEST_F(TransactionRouterTestWithDefaultSession,
StartTxnShouldBeAttachedOnlyOnFirstStatementToParticipant) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -271,6 +272,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -309,6 +311,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime)
TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
@@ -320,6 +323,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting)
TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndReadConcern) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -386,6 +390,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe
TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -409,6 +414,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState)
}
TxnNumber txnNum2{5};
+ operationContext()->setTxnNumber(txnNum2);
txnRouter.beginOrContinueTxn(
operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -434,6 +440,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState)
TEST_F(TransactionRouterTestWithDefaultSession,
DoNotAttachTxnRetryCounterIfTxnRetryCounterIsDefault) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -452,6 +459,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -477,6 +485,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) {
}
TxnNumber txnNum2{5};
+ operationContext()->setTxnNumber(txnNum2);
txnRouter.beginOrContinueTxn(
operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -494,6 +503,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) {
TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForReadOnlyTransaction) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -536,6 +546,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForRea
TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsSetToSingleParticipantIfSingleParticipantDoesWriteOnFirstStatement) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -551,6 +562,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsSetToSingleParticipantIfSingleParticipantDoesWriteOnLaterStatement) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -572,6 +584,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsSetToSecondParticipantIfSecondParticipantIsFirstToDoAWrite) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -593,6 +606,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsResetIfRecoveryParticipantIsPendingAndPendingParticipantsAreCleared) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -617,6 +631,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
RecoveryShardIsNotResetIfRecoveryParticipantIsNotPendingAndPendingParticipantsAreCleared) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -653,6 +668,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNewTransaction) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -675,6 +691,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNe
TEST_F(TransactionRouterTestWithDefaultSession, DoesNotAttachTxnNumIfAlreadyThere) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -702,6 +719,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
CrashesIfCmdHasDifferentTxnNumber,
"invariant") {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -715,26 +733,38 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
<< "txnNumber" << TxnNumber(10)));
}
-TEST_F(TransactionRouterTestWithDefaultSession, CannotUnstashWithDifferentTxnNumber) {
+DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
+ CannotUnstashWithDifferentTxnNumber,
+ "The requested operation has a different transaction number") {
TxnNumber txnNum{3};
- operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
- // Simulates the user beginning another transaction while the previous one has not completed.
+ operationContext()->setTxnNumber(txnNum + 1);
+ txnRouter.stash(operationContext(), TransactionRouter::StashReason::kYield);
+ txnRouter.unstash(operationContext());
+}
+
+DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
+ CannotUnstashWithNoTxnNumber,
+ "Cannot unstash without a transaction number") {
+ TxnNumber txnNum{3};
+
+ auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
- operationContext(), txnNum + 1, TransactionRouter::TransactionActions::kStart);
+ operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
- ASSERT_THROWS_CODE(
- txnRouter.unstash(operationContext()), AssertionException, ErrorCodes::NoSuchTransaction);
+ txnRouter.stash(operationContext(), TransactionRouter::StashReason::kYield);
+ txnRouter.unstash(operationContext());
}
TEST_F(TransactionRouterTestWithDefaultSession, SuccessfullyUnstashWithMatchingTxnNumber) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -742,11 +772,13 @@ TEST_F(TransactionRouterTestWithDefaultSession, SuccessfullyUnstashWithMatchingT
txnRouter.setDefaultAtClusterTime(operationContext());
operationContext()->setTxnNumber(txnNum);
+ txnRouter.stash(operationContext(), TransactionRouter::StashReason::kYield);
txnRouter.unstash(operationContext());
}
TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfAlreadyOnCmd) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -778,6 +810,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SameAPIParametersAfterFirstState
apiParameters.setAPIVersion("1");
APIParameters::get(operationContext()) = apiParameters;
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -795,6 +828,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, DifferentAPIParametersAfterFirst
apiParameters.setAPIVersion("1");
APIParameters::get(operationContext()) = apiParameters;
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -816,6 +850,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NoAPIParametersAfterFirstStateme
apiParameters.setAPIVersion("1");
APIParameters::get(operationContext()) = apiParameters;
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -834,6 +869,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NoAPIParametersAfterFirstStateme
TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFirstStatement) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -851,6 +887,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughEmptyReadConcernToP
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -877,6 +914,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
repl::ReadConcernArgs(kAfterClusterTime, boost::none);
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -902,6 +940,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedReadConcernLeve
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(readConcernLevel);
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
txnRouter.beginOrContinueTxn(
@@ -917,6 +956,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
repl::ReadConcernArgs(LogicalTime(Timestamp(10, 1)), readConcernLevel);
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
txnRouter.beginOrContinueTxn(
@@ -932,6 +972,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
repl::ReadConcernArgs(repl::OpTime(Timestamp(10, 1), 2), readConcernLevel);
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
ASSERT_THROWS_CODE(
txnRouter.beginOrContinueTxn(
@@ -943,6 +984,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter
TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsOrRecoveryToken) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -985,6 +1027,7 @@ void checkWriteConcern(const BSONObj& cmdObj, const WriteConcernOptions& expecte
TEST_F(TransactionRouterTestWithDefaultSession,
CommitTransactionWithNoParticipantsDoesNotSendCommit) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1006,6 +1049,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyForSingleParticipantThatIsReadOnly) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1042,6 +1086,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyForSingleParticipantThatDidAWrite) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1078,6 +1123,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCommitDirectlyForMultipleParticipantsThatAreAllReadOnly) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1128,6 +1174,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCoordinateCommitForMultipleParticipantsOnlyOneDidAWrite) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1176,6 +1223,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
SendCoordinateCommitForMultipleParticipantsMoreThanOneDidAWrite) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1291,6 +1339,7 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) {
TEST_F(TransactionRouterTestWithDefaultSession,
CrossShardTxnCommitWorksAfterRecoveryCommitForPreviousTransaction) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto opCtx = operationContext();
opCtx->setTxnNumber(txnNum);
@@ -1333,6 +1382,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// should be sent with the correct participant list.
{
++txnNum;
+ operationContext()->setTxnNumber(txnNum);
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1380,6 +1430,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
RouterShouldWorkAsRecoveryRouterEvenIfItHasSeenPreviousTransactions) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto opCtx = operationContext();
opCtx->setTxnNumber(txnNum);
@@ -1439,7 +1490,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
// participants.
{
++txnNum;
-
+ operationContext()->setTxnNumber(txnNum);
txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit);
TxnRecoveryToken recoveryToken;
@@ -1524,6 +1575,7 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) {
TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsResetAtClusterTime) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1572,6 +1624,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsResetAtClusterTime
TEST_F(TransactionRouterTestWithDefaultSession,
CannotChangeAtClusterTimeAfterStatementThatSelectedIt) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1633,6 +1686,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipants) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1683,6 +1737,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipa
TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAfterFirstCommand) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1706,6 +1761,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAft
TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreatedAt) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
+
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
operationContext(), txnNum, TransactionRouter::TransactionActions::kStart);
@@ -1738,6 +1795,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate
repl::ReadConcernArgs::get(operationContext()) =
repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern);
TxnNumber txnNum2{5};
+ operationContext()->setTxnNumber(txnNum2);
txnRouter.beginOrContinueTxn(
operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart);
txnRouter.setDefaultAtClusterTime(operationContext());
@@ -1759,6 +1817,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate
TEST_F(TransactionRouterTestWithDefaultSession,
AllParticipantsAndCoordinatorClearedOnStaleErrorOnFirstCommand) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1804,6 +1863,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClearedOnStaleError) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1848,6 +1908,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClea
TEST_F(TransactionRouterTestWithDefaultSession,
RetriesCannotPickNewAtClusterTimeOnStatementAfterSelected) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -1901,6 +1962,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, WritesCanOnlyBeRetriedIfFirstOve
auto otherCmds = {"find", "distinct", "aggregate", "killCursors", "getMore"};
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2127,6 +2189,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError
TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNewParticipants) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2304,6 +2367,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) {
TEST_F(TransactionRouterTestWithDefaultSession, AbortPropagatesWriteConcern) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto opCtx = operationContext();
auto txnRouter = TransactionRouter::get(opCtx);
@@ -2332,6 +2396,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, AbortPropagatesWriteConcern) {
TEST_F(TransactionRouterTestWithDefaultSession, ContinueOnlyOnStaleVersionOnFirstOp) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2384,6 +2449,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinueOnlyOnStaleVersionOnFirs
TEST_F(TransactionRouterTestWithDefaultSession, ContinuingTransactionPlacesItsReadConcernOnOpCtx) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2401,6 +2467,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinuingTransactionPlacesItsRe
TEST_F(TransactionRouterTestWithDefaultSession,
SubsequentStatementCanSelectAtClusterTimeIfNotSelectedYet) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2547,6 +2614,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernLevelsPres
TEST_F(TransactionRouterTestWithDefaultSession,
AbortBetweenStatementRetriesIgnoresNoSuchTransaction) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2577,6 +2645,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
AbortBetweenStatementRetriesUsesIdempotentRetryPolicy) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2609,6 +2678,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
AbortBetweenStatementRetriesFailsWithNoSuchTransactionOnUnexpectedErrors) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2641,6 +2711,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseInvariantsIfParticipantDoesNotExist,
"Participant should exist if processing participant response") {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2658,6 +2729,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseDoesNotUpdateParticipantIfResponseStatusIsNotOk) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2673,6 +2745,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseMarksParticipantAsReadOnlyIfResponseSaysReadOnlyTrue) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2698,6 +2771,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseMarksParticipantAsNotReadOnlyIfFirstResponseSaysReadOnlyFalse) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2723,6 +2797,7 @@ TEST_F(
TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseUpdatesParticipantToReadOnlyFalseIfLaterResponseSaysReadOnlyFalse) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2749,6 +2824,7 @@ TEST_F(
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseThrowsIfParticipantClaimsToChangeFromReadOnlyFalseToReadOnlyTrue) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2774,6 +2850,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantResponseThrowsIfParticipantReturnsErrorThenSuccessOnLaterStatement) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto txnRouter = TransactionRouter::get(operationContext());
txnRouter.beginOrContinueTxn(
@@ -2811,6 +2888,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantSkipsValidationIfAbortAlreadyInitiated) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto opCtx = operationContext();
auto txnRouter = TransactionRouter::get(opCtx);
@@ -2837,6 +2915,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantSkipsValidationIfImplicitAbortAlreadyInitiated) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto opCtx = operationContext();
auto txnRouter = TransactionRouter::get(opCtx);
@@ -2859,6 +2938,7 @@ TEST_F(TransactionRouterTestWithDefaultSession,
TEST_F(TransactionRouterTestWithDefaultSession,
ProcessParticipantSkipsValidationIfCommitAlreadyInitiated) {
TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
auto opCtx = operationContext();
auto txnRouter = TransactionRouter::get(opCtx);
@@ -2884,6 +2964,111 @@ TEST_F(TransactionRouterTestWithDefaultSession,
txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse);
}
+TEST_F(TransactionRouterTestWithDefaultSession, CannotAdvanceTxnNumberWithActiveYielder) {
+ TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
+ auto opCtx = operationContext();
+
+ auto txnRouter = TransactionRouter::get(opCtx);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield);
+
+ // We can beginOrContinueTxn at the active txnNumber. This simulates a yielded session being
+ // checked out by a different operation for that same transaction.
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kContinue);
+
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield);
+
+ // A higher txnNumber cannot be used.
+ ASSERT_THROWS_CODE(txnRouter.beginOrContinueTxn(
+ opCtx, txnNum + 1, TransactionRouter::TransactionActions::kStart),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+
+ ASSERT_THROWS_CODE(txnRouter.beginOrContinueTxn(
+ opCtx, txnNum + 1, TransactionRouter::TransactionActions::kCommit),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+
+ TransactionRouter::get(opCtx).unstash(opCtx);
+
+ // A higher txnNumber still cannot be used because there is an outstanding yielder.
+ ASSERT_THROWS_CODE(txnRouter.beginOrContinueTxn(
+ opCtx, txnNum + 1, TransactionRouter::TransactionActions::kStart),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+
+ ASSERT_THROWS_CODE(txnRouter.beginOrContinueTxn(
+ opCtx, txnNum + 1, TransactionRouter::TransactionActions::kCommit),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+
+ TransactionRouter::get(opCtx).unstash(opCtx);
+
+ // A non-yielding stash does not affect whether a new transaction can begin. This simulates an
+ // operation completing. Stash multiple times to verify it does not bring the activeYields
+ // counter below 0 and trigger a crash.
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone);
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone);
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone);
+
+ // Now the router can advance.
+ txnRouter.beginOrContinueTxn(opCtx, txnNum + 1, TransactionRouter::TransactionActions::kStart);
+}
+
+DEATH_TEST_F(TransactionRouterTestWithDefaultSession,
+ ActiveYieldersCannotGoBelowZero,
+ "Invalid activeYields") {
+ TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
+ auto opCtx = operationContext();
+
+ auto txnRouter = TransactionRouter::get(opCtx);
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield);
+ txnRouter.unstash(opCtx);
+ txnRouter.unstash(opCtx);
+}
+
+TEST_F(TransactionRouterTestWithDefaultSession, CannotBeReapedWithActiveYielders) {
+ TxnNumber txnNum{3};
+ operationContext()->setTxnNumber(txnNum);
+ auto opCtx = operationContext();
+
+ auto txnRouter = TransactionRouter::get(opCtx);
+
+ // Can be reaped initially.
+ ASSERT(txnRouter.canBeReaped());
+
+ // Can be reaped after a non-yield stash.
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart);
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone);
+ ASSERT(txnRouter.canBeReaped());
+
+ // Cannot be reaped after a yield stash.
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield);
+ ASSERT_FALSE(txnRouter.canBeReaped());
+
+ // Cannot be reaped after multiple yield stashes until all corresponding unstashes.
+ repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kContinue);
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield);
+ ASSERT_FALSE(txnRouter.canBeReaped());
+
+ txnRouter.unstash(opCtx);
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone);
+ ASSERT_FALSE(txnRouter.canBeReaped());
+
+ txnRouter.unstash(opCtx);
+ txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone);
+ ASSERT(txnRouter.canBeReaped());
+}
+
// Begins a transaction with snapshot level read concern and sets a default cluster time.
class TransactionRouterTestWithDefaultSessionAndStartedSnapshot
: public TransactionRouterTestWithDefaultSession {
@@ -2951,6 +3136,8 @@ protected:
void setUp() override {
TransactionRouterTestWithDefaultSession::setUp();
repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs();
+ _txnRouter = TransactionRouter::get(operationContext());
+ operationContext()->setTxnNumber(kTxnNumber);
}
TickSourceMock<Microseconds>* tickSource() {
@@ -2965,8 +3152,9 @@ protected:
return dynamic_cast<ClockSourceMock*>(getServiceContext()->getPreciseClockSource());
}
- TransactionRouter::Router txnRouter() {
- return TransactionRouter::get(operationContext());
+ TransactionRouter::Router& txnRouter() {
+ invariant(_txnRouter);
+ return *_txnRouter;
}
void beginTxnWithDefaultTxnNumber() {
@@ -3221,7 +3409,7 @@ protected:
assertTimeActiveIs(Microseconds(timeActive));
assertTimeInactiveIs(Microseconds(timeInactive));
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
tickSource()->advance(Microseconds(150));
assertTimeActiveIs(Microseconds(timeActive));
@@ -3233,12 +3421,15 @@ protected:
assertTimeActiveIs(kDefaultTimeActive);
assertTimeInactiveIs(Microseconds(0));
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
tickSource()->advance(kDefaultTimeInactive);
assertTimeActiveIs(kDefaultTimeActive);
assertTimeInactiveIs(kDefaultTimeInactive);
}
+
+private:
+ boost::optional<TransactionRouter::Router> _txnRouter;
};
//
@@ -3404,7 +3595,7 @@ TEST_F(TransactionRouterMetricsTest, SlowLoggingPrintsTimeActiveAndInactive) {
tickSource()->advance(Microseconds(111111));
assertTimeActiveIs(Microseconds(111111));
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
tickSource()->advance(Microseconds(222222));
assertTimeInactiveIs(Microseconds(222222));
@@ -3926,6 +4117,7 @@ TEST_F(TransactionRouterMetricsTest, CommitDurationResetByNewTransaction) {
future.default_timed_get();
// Start a new transaction and verify the commit duration was reset.
+ operationContext()->setTxnNumber(kTxnNumber + 1);
txnRouter().beginOrContinueTxn(
operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
@@ -4050,7 +4242,7 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceSeparatelyAndSu
assertDurationIs(Microseconds(50));
// Only timeInactive will advance while a txn is stashed.
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
tickSource()->advance(Microseconds(100));
assertTimeActiveIs(Microseconds(50));
assertTimeInactiveIs(Microseconds(100));
@@ -4080,7 +4272,7 @@ TEST_F(TransactionRouterMetricsTest, StashIsIdempotent) {
assertTimeActiveIs(kDefaultTimeActive);
assertTimeInactiveIs(kDefaultTimeInactive);
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
// Only timeInactive can advance.
@@ -4088,7 +4280,7 @@ TEST_F(TransactionRouterMetricsTest, StashIsIdempotent) {
assertTimeActiveIs(kDefaultTimeActive);
assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100));
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
// Still only timeInactive can advance.
@@ -4106,7 +4298,7 @@ TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedStashedTxn) {
assertTimeInactiveIs(kDefaultTimeInactive);
assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive);
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
// At shutdown transactions are implicitly aborted without being continued so a transaction may
// be stashed when aborting, which should still lead to durations in a consistent state.
@@ -4153,7 +4345,7 @@ TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedEndedTxn) {
txnRouter().beginOrContinueTxn(
operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit);
runCommit(kDummyOkRes);
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
// At shutdown transactions are implicitly aborted without being continued, so an "ended"
// transaction (i.e. committed or aborted) may be implicitly aborted again. This shouldn't
@@ -4211,7 +4403,7 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom
assertTimeInactiveIs(kDefaultTimeInactive);
// timeInactive can advance.
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
tickSource()->advance(Microseconds(100));
assertTimeActiveIs(kDefaultTimeActive + Microseconds(100));
assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100));
@@ -4226,7 +4418,7 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom
assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100));
// timeInactive can advance.
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
tickSource()->advance(Microseconds(100));
assertTimeActiveIs(kDefaultTimeActive + Microseconds(200));
assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(200));
@@ -4293,7 +4485,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginRecover) {
TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_Stash) {
beginRecoverCommitWithDefaultTxnNumber();
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
@@ -4301,7 +4493,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_Stash) {
TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAfterStash) {
beginRecoverCommitWithDefaultTxnNumber();
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
txnRouter().beginOrContinueTxn(
operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue);
@@ -4326,14 +4518,14 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_AreNotCumulative) {
// Test inactive.
txnRouter().beginOrContinueTxn(
operationContext(), kTxnNumber + 2, TransactionRouter::TransactionActions::kStart);
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
txnRouter().beginOrContinueTxn(
operationContext(), kTxnNumber + 3, TransactionRouter::TransactionActions::kStart);
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
@@ -4357,7 +4549,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_UnknownCommit) {
TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_ImplicitAbortForStashedTxn) {
beginTxnWithDefaultTxnNumber();
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive());
@@ -4396,7 +4588,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAndStashForEndedT
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive());
- txnRouter().stash(operationContext());
+ txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone);
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen());
ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive());
@@ -4567,6 +4759,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalStarted_IsCumulative) {
beginTxnWithDefaultTxnNumber();
ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalStarted());
+ operationContext()->setTxnNumber(kTxnNumber + 1);
txnRouter().beginOrContinueTxn(
operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalStarted());
@@ -4635,6 +4828,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalCommitted_IsCumulative) {
runCommit(kDummyOkRes);
ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalCommitted());
+ operationContext()->setTxnNumber(kTxnNumber + 1);
txnRouter().beginOrContinueTxn(
operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
runCommit(kDummyOkRes);
@@ -4811,6 +5005,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalParticipantsAtCommit) {
ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalParticipantsAtCommit());
// Is cumulative across transactions.
+ operationContext()->setTxnNumber(kTxnNumber + 1);
txnRouter().beginOrContinueTxn(
operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {});
@@ -4893,6 +5088,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCommitTypeStatsSuccessfulDurat
.successfulDurationMicros.load());
// Start a new transaction and verify that successful commit duration is cumulative.
+ operationContext()->setTxnNumber(kTxnNumber + 1);
txnRouter().beginOrContinueTxn(
operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart);
txnRouter().setDefaultAtClusterTime(operationContext());