From 8775f61d11bbb49f191e6e1e2713b8a6b5aef73a Mon Sep 17 00:00:00 2001 From: Jack Mulrow Date: Mon, 21 Mar 2022 03:20:43 +0000 Subject: SERVER-63495 Unyielding TransactionRouter never fails except at shutdown and support yielding in cluster commands --- src/mongo/s/async_requests_sender.cpp | 14 +- src/mongo/s/async_requests_sender_test.cpp | 4 +- .../s/commands/cluster_find_and_modify_cmd.cpp | 15 +- src/mongo/s/commands/cluster_write_cmd.cpp | 8 +- src/mongo/s/commands/strategy.cpp | 21 +- ...multi_statement_transaction_requests_sender.cpp | 16 +- src/mongo/s/query/router_stage_merge.h | 6 +- src/mongo/s/session_catalog_router.cpp | 14 +- src/mongo/s/session_catalog_router.h | 2 +- src/mongo/s/transaction_router.cpp | 63 +++++- src/mongo/s/transaction_router.h | 22 +- .../s/transaction_router_resource_yielder.cpp | 33 ++- src/mongo/s/transaction_router_resource_yielder.h | 20 +- src/mongo/s/transaction_router_test.cpp | 246 ++++++++++++++++++--- 14 files changed, 389 insertions(+), 95 deletions(-) (limited to 'src/mongo/s') diff --git a/src/mongo/s/async_requests_sender.cpp b/src/mongo/s/async_requests_sender.cpp index aae48dbb1ab..1fceefc428e 100644 --- a/src/mongo/s/async_requests_sender.cpp +++ b/src/mongo/s/async_requests_sender.cpp @@ -117,13 +117,15 @@ AsyncRequestsSender::Response AsyncRequestsSender::next() noexcept { _resourceYielder->yield(_opCtx); } - // Only wait for the next result without popping it, so an error unyielding doesn't discard - // an already popped response. - _responseQueue.waitForNonEmpty(_opCtx); + // Only wait for the next result without popping it, so an error unyielding doesn't + // discard an already popped response. + auto waitStatus = _responseQueue.waitForNonEmptyNoThrow(_opCtx); - if (_resourceYielder) { - _resourceYielder->unyield(_opCtx); - } + auto unyieldStatus = + _resourceYielder ? _resourceYielder->unyieldNoThrow(_opCtx) : Status::OK(); + + uassertStatusOK(waitStatus); + uassertStatusOK(unyieldStatus); // There should always be a response ready after the wait above. auto response = _responseQueue.tryPop(); diff --git a/src/mongo/s/async_requests_sender_test.cpp b/src/mongo/s/async_requests_sender_test.cpp index 28be4bfa668..39874e7e898 100644 --- a/src/mongo/s/async_requests_sender_test.cpp +++ b/src/mongo/s/async_requests_sender_test.cpp @@ -214,7 +214,7 @@ TEST_F(AsyncRequestsSenderTest, HandlesExceptionWhenUnyielding) { future.default_timed_get(); } -TEST_F(AsyncRequestsSenderTest, ExceptionWhileWaitingSkipsUnyield) { +TEST_F(AsyncRequestsSenderTest, ExceptionWhileWaitingDoesNotSkipUnyield) { class CountingResourceYielder : public ResourceYielder { public: void yield(OperationContext*) { @@ -258,7 +258,7 @@ TEST_F(AsyncRequestsSenderTest, ExceptionWhileWaitingSkipsUnyield) { future.default_timed_get(); ASSERT_EQ(yielderPointer->timesYielded, 1); - ASSERT_EQ(yielderPointer->timesUnyielded, 0); + ASSERT_EQ(yielderPointer->timesUnyielded, 1); } } // namespace } // namespace mongo diff --git a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp index 5931b4b1d73..501a996c84b 100644 --- a/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp +++ b/src/mongo/s/commands/cluster_find_and_modify_cmd.cpp @@ -148,10 +148,8 @@ void handleWouldChangeOwningShardErrorRetryableWrite( const NamespaceString& nss, const write_ops::FindAndModifyCommandRequest& request, BSONObjBuilder* result) { - auto txn = - txn_api::TransactionWithRetries(opCtx, - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), - TransactionRouterResourceYielder::make()); + auto txn = txn_api::TransactionWithRetries( + opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), nullptr); // Shared state for the transaction API use below. struct SharedBlock { @@ -245,10 +243,11 @@ void handleWouldChangeOwningShardErrorTransaction( WouldChangeOwningShardInfo::parseFromCommandError(extraInfo), nss); try { - auto txn = - txn_api::TransactionWithRetries(opCtx, - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), - TransactionRouterResourceYielder::make()); + auto txn = txn_api::TransactionWithRetries( + opCtx, + Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), + TransactionRouterResourceYielder::makeForLocalHandoff()); + txn.runSync(opCtx, [sharedBlock](const txn_api::TransactionClient& txnClient, diff --git a/src/mongo/s/commands/cluster_write_cmd.cpp b/src/mongo/s/commands/cluster_write_cmd.cpp index e7fb4233e00..af64f184f90 100644 --- a/src/mongo/s/commands/cluster_write_cmd.cpp +++ b/src/mongo/s/commands/cluster_write_cmd.cpp @@ -156,10 +156,8 @@ void handleWouldChangeOwningShardErrorRetryableWrite(OperationContext* opCtx, // Unset error details because they will be repopulated below. response->unsetErrDetails(); - auto txn = - txn_api::TransactionWithRetries(opCtx, - Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), - TransactionRouterResourceYielder::make()); + auto txn = txn_api::TransactionWithRetries( + opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), nullptr); // Shared state for the transaction API use below. struct SharedBlock { @@ -238,7 +236,7 @@ UpdateShardKeyResult handleWouldChangeOwningShardErrorTransaction( auto txn = txn_api::TransactionWithRetries(opCtx, Grid::get(opCtx)->getExecutorPool()->getFixedExecutor(), - TransactionRouterResourceYielder::make()); + TransactionRouterResourceYielder::makeForLocalHandoff()); try { txn.runSync(opCtx, diff --git a/src/mongo/s/commands/strategy.cpp b/src/mongo/s/commands/strategy.cpp index 379d0f5365d..edefcf546fe 100644 --- a/src/mongo/s/commands/strategy.cpp +++ b/src/mongo/s/commands/strategy.cpp @@ -199,13 +199,10 @@ Future invokeInTransactionRouter(std::shared_ptr auto opCtx = rec->getOpCtx(); - auto txnRouter = TransactionRouter::get(opCtx); - if (!txnRouter) { - // The command had yielded its session while the error was thrown. - return; + // Abort if the router wasn't yielded, which may happen at global shutdown. + if (auto txnRouter = TransactionRouter::get(opCtx)) { + txnRouter.implicitlyAbortTransaction(opCtx, status); } - - TransactionRouter::get(opCtx).implicitlyAbortTransaction(opCtx, status); }); } @@ -443,8 +440,6 @@ private: // Logs and updates statistics if an error occurs. void _tapOnError(const Status& status); - void _tapOnSuccess(); - ParseAndRunCommand* const _parc; boost::optional _routerSession; @@ -1085,15 +1080,6 @@ void ParseAndRunCommand::RunInvocation::_tapOnError(const Status& status) { _parc->_errorBuilder->appendElements(errorLabels); } -void ParseAndRunCommand::RunInvocation::_tapOnSuccess() { - auto opCtx = _parc->_rec->getOpCtx(); - if (_parc->_osi && _parc->_osi->getAutocommit() == boost::optional(false)) { - tassert(5918604, - "A successful transaction command must always check out its session after yielding", - TransactionRouter::get(opCtx)); - } -} - Future ParseAndRunCommand::RunAndRetry::run() { return makeReadyFutureWith([&] { // Try kMaxNumStaleVersionRetries times. On the last try, exceptions are rethrown. @@ -1134,7 +1120,6 @@ Future ParseAndRunCommand::RunInvocation::run() { return future_util::makeState(_parc).thenWithState( [](auto* runner) { return runner->run(); }); }) - .tap([this]() { _tapOnSuccess(); }) .tapError([this](Status status) { _tapOnError(status); }); } diff --git a/src/mongo/s/multi_statement_transaction_requests_sender.cpp b/src/mongo/s/multi_statement_transaction_requests_sender.cpp index d6f1341584c..c62b4e97da4 100644 --- a/src/mongo/s/multi_statement_transaction_requests_sender.cpp +++ b/src/mongo/s/multi_statement_transaction_requests_sender.cpp @@ -33,6 +33,7 @@ #include "mongo/db/operation_context.h" #include "mongo/s/transaction_router.h" +#include "mongo/s/transaction_router_resource_yielder.h" namespace mongo { @@ -81,13 +82,14 @@ MultiStatementTransactionRequestsSender::MultiStatementTransactionRequestsSender const ReadPreferenceSetting& readPreference, Shard::RetryPolicy retryPolicy) : _opCtx(opCtx), - _ars(std::make_unique(opCtx, - std::move(executor), - dbName, - attachTxnDetails(opCtx, requests), - readPreference, - retryPolicy, - nullptr /* resourceYielder */)) {} + _ars(std::make_unique( + opCtx, + std::move(executor), + dbName, + attachTxnDetails(opCtx, requests), + readPreference, + retryPolicy, + TransactionRouterResourceYielder::makeForRemoteCommand())) {} MultiStatementTransactionRequestsSender::~MultiStatementTransactionRequestsSender() { invariant(_opCtx); diff --git a/src/mongo/s/query/router_stage_merge.h b/src/mongo/s/query/router_stage_merge.h index 72f1aec26fb..c095f1b146a 100644 --- a/src/mongo/s/query/router_stage_merge.h +++ b/src/mongo/s/query/router_stage_merge.h @@ -35,6 +35,7 @@ #include "mongo/s/query/blocking_results_merger.h" #include "mongo/s/query/cluster_client_cursor_params.h" #include "mongo/s/query/router_exec_stage.h" +#include "mongo/s/transaction_router_resource_yielder.h" #include "mongo/util/net/hostandport.h" namespace mongo { @@ -49,7 +50,10 @@ public: std::shared_ptr executor, AsyncResultsMergerParams&& armParams) : RouterExecStage(opCtx), - _resultsMerger(opCtx, std::move(armParams), std::move(executor), nullptr) {} + _resultsMerger(opCtx, + std::move(armParams), + std::move(executor), + TransactionRouterResourceYielder::makeForRemoteCommand()) {} StatusWith next() final { return _resultsMerger.next(getOpCtx()); diff --git a/src/mongo/s/session_catalog_router.cpp b/src/mongo/s/session_catalog_router.cpp index 1207f371dbf..e8b9470ca17 100644 --- a/src/mongo/s/session_catalog_router.cpp +++ b/src/mongo/s/session_catalog_router.cpp @@ -73,16 +73,20 @@ RouterOperationContextSession::RouterOperationContextSession(OperationContext* o RouterOperationContextSession::~RouterOperationContextSession() { if (auto txnRouter = TransactionRouter::get(_opCtx)) { - // Only stash if the session wasn't yielded. - txnRouter.stash(_opCtx); + // Only stash if the session wasn't yielded. This should only happen at global shutdown. + txnRouter.stash(_opCtx, TransactionRouter::StashReason::kDone); } }; -void RouterOperationContextSession::checkIn(OperationContext* opCtx) { +void RouterOperationContextSession::checkIn(OperationContext* opCtx, + OperationContextSession::CheckInReason reason) { invariant(OperationContextSession::get(opCtx)); - TransactionRouter::get(opCtx).stash(opCtx); - OperationContextSession::checkIn(opCtx); + TransactionRouter::get(opCtx).stash(opCtx, + reason == OperationContextSession::CheckInReason::kYield + ? TransactionRouter::StashReason::kYield + : TransactionRouter::StashReason::kDone); + OperationContextSession::checkIn(opCtx, reason); } void RouterOperationContextSession::checkOut(OperationContext* opCtx) { diff --git a/src/mongo/s/session_catalog_router.h b/src/mongo/s/session_catalog_router.h index 301b40fffca..ecb4771d3c7 100644 --- a/src/mongo/s/session_catalog_router.h +++ b/src/mongo/s/session_catalog_router.h @@ -68,7 +68,7 @@ public: * Check-in may only be called if the session has actually been checked out previously and * similarly check-out may only be called if the session is not checked out already. */ - static void checkIn(OperationContext* opCtx); + static void checkIn(OperationContext* opCtx, OperationContextSession::CheckInReason reason); static void checkOut(OperationContext* opCtx); private: diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp index e2739a36778..10ecdb05442 100644 --- a/src/mongo/s/transaction_router.cpp +++ b/src/mongo/s/transaction_router.cpp @@ -57,6 +57,7 @@ #include "mongo/s/router_transactions_metrics.h" #include "mongo/s/shard_cannot_refresh_due_to_locks_held_exception.h" #include "mongo/util/assert_util.h" +#include "mongo/util/exit.h" #include "mongo/util/fail_point.h" #include "mongo/util/log_with_sampling.h" #include "mongo/util/net/socket_utils.h" @@ -1040,13 +1041,18 @@ void TransactionRouter::Router::beginOrContinueTxn(OperationContext* opCtx, _updateLastClientInfo(opCtx->getClient()); } -void TransactionRouter::Router::stash(OperationContext* opCtx) { +void TransactionRouter::Router::stash(OperationContext* opCtx, StashReason reason) { if (!isInitialized()) { return; } - auto tickSource = opCtx->getServiceContext()->getTickSource(); stdx::lock_guard lk(*opCtx->getClient()); + + if (reason == StashReason::kYield) { + ++o(lk).activeYields; + } + + auto tickSource = opCtx->getServiceContext()->getTickSource(); o(lk).metricsTracker->trySetInactive(tickSource, tickSource->getTicks()); } @@ -1055,11 +1061,21 @@ void TransactionRouter::Router::unstash(OperationContext* opCtx) { return; } - // Validate that the transaction number hasn't changed. - if (o().txnNumberAndRetryCounter.getTxnNumber() != opCtx->getTxnNumber()) { - uasserted(ErrorCodes::NoSuchTransaction, - "The requested operation has a different transaction number than the active " - "transaction."); + // Validate that the transaction number hasn't changed while we were yielded. This is guaranteed + // by the activeYields check when beginning a new transaction. + invariant(opCtx->getTxnNumber(), "Cannot unstash without a transaction number"); + invariant(o().txnNumberAndRetryCounter.getTxnNumber() == opCtx->getTxnNumber(), + str::stream() + << "The requested operation has a different transaction number than the active " + "transaction. Active: " + << o().txnNumberAndRetryCounter.getTxnNumber() + << ", operation: " << *opCtx->getTxnNumber()); + + { + stdx::lock_guard lg(*opCtx->getClient()); + --o(lg).activeYields; + invariant(o(lg).activeYields >= 0, + str::stream() << "Invalid activeYields: " << o(lg).activeYields); } auto tickSource = opCtx->getServiceContext()->getTickSource(); @@ -1242,12 +1258,29 @@ BSONObj TransactionRouter::Router::_commitTransaction( return _handOffCommitToCoordinator(opCtx); } +// Returns if the opCtx has yielded its session and failed to unyield it, which may happen during +// methods that send network requests at global shutdown when running on a mongod. +bool failedToUnyieldSessionAtShutdown(OperationContext* opCtx) { + if (!TransactionRouter::get(opCtx)) { + invariant(globalInShutdownDeprecated()); + return true; + } + return false; +} + BSONObj TransactionRouter::Router::abortTransaction(OperationContext* opCtx) { invariant(isInitialized()); // Update stats on scope exit so the transaction is considered "active" while waiting on abort // responses. - ScopeGuard updateStatsGuard([&] { _onExplicitAbort(opCtx); }); + ScopeGuard updateStatsGuard([&] { + if (failedToUnyieldSessionAtShutdown(opCtx)) { + // It's unsafe to continue without the session checked out. This should only happen at + // global shutdown, so it's acceptable to skip updating stats. + return; + } + _onExplicitAbort(opCtx); + }); // The router has yet to send any commands to a remote shard for this transaction. // Return the same error that would have been returned by a shard. @@ -1325,7 +1358,14 @@ void TransactionRouter::Router::implicitlyAbortTransaction(OperationContext* opC // Update stats on scope exit so the transaction is considered "active" while waiting on abort // responses. - ScopeGuard updateStatsGuard([&] { _onImplicitAbort(opCtx, status); }); + ScopeGuard updateStatsGuard([&] { + if (failedToUnyieldSessionAtShutdown(opCtx)) { + // It's unsafe to continue without the session checked out. This should only happen at + // global shutdown, so it's acceptable to skip updating stats. + return; + } + _onImplicitAbort(opCtx, status); + }); if (o().participants.empty()) { return; @@ -1396,6 +1436,11 @@ void TransactionRouter::Router::appendRecoveryToken(BSONObjBuilder* builder) con void TransactionRouter::Router::_resetRouterState( OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) { stdx::lock_guard lk(*opCtx->getClient()); + + uassert(ErrorCodes::ConflictingOperationInProgress, + "Cannot start a new transaction while the previous is yielded", + o(lk).activeYields == 0); + o(lk).txnNumberAndRetryCounter.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber()); o(lk).txnNumberAndRetryCounter.setTxnRetryCounter( *txnNumberAndRetryCounter.getTxnRetryCounter()); diff --git a/src/mongo/s/transaction_router.h b/src/mongo/s/transaction_router.h index 276a473aead..a7b49c5eb12 100644 --- a/src/mongo/s/transaction_router.h +++ b/src/mongo/s/transaction_router.h @@ -71,6 +71,9 @@ public: kRecoverWithToken, }; + // The reason why TransactionRouter::Router::stash() is called. + enum class StashReason { kDone, kYield }; + // The default value to use as the statement id of the first command in the transaction if none // was sent. static const StmtId kDefaultFirstStmtId = 0; @@ -326,6 +329,15 @@ public: return o().txnNumberAndRetryCounter.getTxnNumber() != kUninitializedTxnNumber; } + /** + * Returns if this TransactionRouter instance can be reaped. Always true unless an operation + * has yielded the router and has not unyielded yet. We cannot reap the instance in that + * case or the unyield would check out a different TransactionRouter than it yielded. + */ + auto canBeReaped() const { + return o().activeYields == 0; + } + protected: explicit Observer(TransactionRouter* tr) : _tr(tr) {} @@ -372,9 +384,10 @@ public: TransactionActions action); /** - * Updates transaction diagnostics when the transaction's session is checked in. + * Updates transaction diagnostics and, if necessary, the number of active yielders when the + * transaction's session is checked in. */ - void stash(OperationContext* opCtx); + void stash(OperationContext* opCtx, StashReason reason); /** * Validates transaction state is still compatible after a yield. @@ -775,6 +788,11 @@ private: // certain transaction events. Unset until the transaction router has processed at least one // transaction command. boost::optional metricsTracker; + + // How many operations that checked out the router's session have currently yielded it. The + // transaction number cannot be changed until this returns to 0, otherwise we cannot + // guarantee that unyielding the session cannot fail. + int32_t activeYields{0}; } _o; /** diff --git a/src/mongo/s/transaction_router_resource_yielder.cpp b/src/mongo/s/transaction_router_resource_yielder.cpp index 252231483a4..6d5a5e0c2a4 100644 --- a/src/mongo/s/transaction_router_resource_yielder.cpp +++ b/src/mongo/s/transaction_router_resource_yielder.cpp @@ -30,25 +30,52 @@ #include "mongo/s/transaction_router_resource_yielder.h" #include "mongo/db/session_catalog.h" +#include "mongo/s/is_mongos.h" #include "mongo/s/session_catalog_router.h" +#include "mongo/util/exit.h" namespace mongo { -std::unique_ptr TransactionRouterResourceYielder::make() { +std::unique_ptr +TransactionRouterResourceYielder::makeForLocalHandoff() { + return std::make_unique(); +} + +std::unique_ptr +TransactionRouterResourceYielder::makeForRemoteCommand() { + if (isMongos()) { + // Mongos cannot target itself so it does not need to yield for remote commands. + return nullptr; + } return std::make_unique(); } void TransactionRouterResourceYielder::yield(OperationContext* opCtx) { Session* const session = OperationContextSession::get(opCtx); if (session) { - RouterOperationContextSession::checkIn(opCtx); + RouterOperationContextSession::checkIn(opCtx, + OperationContextSession::CheckInReason::kYield); } _yielded = (session != nullptr); } void TransactionRouterResourceYielder::unyield(OperationContext* opCtx) { if (_yielded) { - RouterOperationContextSession::checkOut(opCtx); + // Code that uses the TransactionRouter assumes it will only run with it, so check back out + // the session ignoring interruptions, except at global shutdown to prevent stalling + // shutdown. Unyield should always run with no resources held, so there shouldn't be a risk + // of deadlock. + try { + opCtx->runWithoutInterruptionExceptAtGlobalShutdown( + [&] { RouterOperationContextSession::checkOut(opCtx); }); + } catch (const DBException&) { + // This can throw at global shutdown, so calling code that catches errors may + // unexpectedly run without a session checked out. This is assumed safe because the + // process is shutting down and can't do any meaningful work. This invariant is to + // safeguard that assumption. + invariant(globalInShutdownDeprecated()); + throw; + } } } diff --git a/src/mongo/s/transaction_router_resource_yielder.h b/src/mongo/s/transaction_router_resource_yielder.h index 173c588c73a..7334d76b178 100644 --- a/src/mongo/s/transaction_router_resource_yielder.h +++ b/src/mongo/s/transaction_router_resource_yielder.h @@ -41,9 +41,22 @@ namespace mongo { class TransactionRouterResourceYielder : public ResourceYielder { public: /** - * Returns a newly allocated yielder. + * The next two methods return a newly allocated yielder for the given yielding scenario. + * + * Use makeForLocalHandoff() when the operation expected to check out the session next is in the + * same local process, e.g. a mongos command in a transaction is yielding so a command spawned + * by the transaction API can check out the same session. + * + * Use makeForRemoteCommand() when yielding waiting for a possibly remote response to be + * received. If the local process serves both a router and mongod role, the local process may + * need to check out the same session to produce the response. + * + * The only difference in behavior is that makeForLocalHandoff() always returns a yielder and + * makeForRemoteCommand() returns nullptr if the local process is a mongos because a mongos + * cannot target itself. */ - static std::unique_ptr make(); + static std::unique_ptr makeForLocalHandoff(); + static std::unique_ptr makeForRemoteCommand(); /** * If the opCtx has a checked out RouterOperationContextSession, yields it. @@ -52,7 +65,8 @@ public: /** * If the opCtx had previously checked out RouterOperationContextSession, checks it back out. - * Note this may throw if the opCtx has been interrupted. + * Note this may throw if the opCtx is interrupted at global shutdown. Otherwise it should never + * fail. */ void unyield(OperationContext* opCtx) override; diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp index 89c3ad2675b..b132e05bdd7 100644 --- a/src/mongo/s/transaction_router_test.cpp +++ b/src/mongo/s/transaction_router_test.cpp @@ -233,6 +233,7 @@ private: TEST_F(TransactionRouterTestWithDefaultSession, StartTxnShouldBeAttachedOnlyOnFirstStatementToParticipant) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -271,6 +272,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -309,6 +311,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, BasicStartTxnWithAtClusterTime) TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( @@ -320,6 +323,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContiueTxnWithoutStarting) TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndReadConcern) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -386,6 +390,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NewParticipantMustAttachTxnAndRe TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -409,6 +414,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) } TxnNumber txnNum2{5}; + operationContext()->setTxnNumber(txnNum2); txnRouter.beginOrContinueTxn( operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -434,6 +440,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, StartingNewTxnShouldClearState) TEST_F(TransactionRouterTestWithDefaultSession, DoNotAttachTxnRetryCounterIfTxnRetryCounterIsDefault) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -452,6 +459,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -477,6 +485,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) { } TxnNumber txnNum2{5}; + operationContext()->setTxnNumber(txnNum2); txnRouter.beginOrContinueTxn( operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -494,6 +503,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, FirstParticipantIsCoordinator) { TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForReadOnlyTransaction) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -536,6 +546,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardDoesNotGetSetForRea TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsSetToSingleParticipantIfSingleParticipantDoesWriteOnFirstStatement) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -551,6 +562,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsSetToSingleParticipantIfSingleParticipantDoesWriteOnLaterStatement) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -572,6 +584,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsSetToSecondParticipantIfSecondParticipantIsFirstToDoAWrite) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -593,6 +606,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetIfRecoveryParticipantIsPendingAndPendingParticipantsAreCleared) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -617,6 +631,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsNotResetIfRecoveryParticipantIsNotPendingAndPendingParticipantsAreCleared) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -653,6 +668,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNewTransaction) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -675,6 +691,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RecoveryShardIsResetOnStartingNe TEST_F(TransactionRouterTestWithDefaultSession, DoesNotAttachTxnNumIfAlreadyThere) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -702,6 +719,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, CrashesIfCmdHasDifferentTxnNumber, "invariant") { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -715,26 +733,38 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, << "txnNumber" << TxnNumber(10))); } -TEST_F(TransactionRouterTestWithDefaultSession, CannotUnstashWithDifferentTxnNumber) { +DEATH_TEST_F(TransactionRouterTestWithDefaultSession, + CannotUnstashWithDifferentTxnNumber, + "The requested operation has a different transaction number") { TxnNumber txnNum{3}; - operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); - // Simulates the user beginning another transaction while the previous one has not completed. + operationContext()->setTxnNumber(txnNum + 1); + txnRouter.stash(operationContext(), TransactionRouter::StashReason::kYield); + txnRouter.unstash(operationContext()); +} + +DEATH_TEST_F(TransactionRouterTestWithDefaultSession, + CannotUnstashWithNoTxnNumber, + "Cannot unstash without a transaction number") { + TxnNumber txnNum{3}; + + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( - operationContext(), txnNum + 1, TransactionRouter::TransactionActions::kStart); + operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); - ASSERT_THROWS_CODE( - txnRouter.unstash(operationContext()), AssertionException, ErrorCodes::NoSuchTransaction); + txnRouter.stash(operationContext(), TransactionRouter::StashReason::kYield); + txnRouter.unstash(operationContext()); } TEST_F(TransactionRouterTestWithDefaultSession, SuccessfullyUnstashWithMatchingTxnNumber) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -742,11 +772,13 @@ TEST_F(TransactionRouterTestWithDefaultSession, SuccessfullyUnstashWithMatchingT txnRouter.setDefaultAtClusterTime(operationContext()); operationContext()->setTxnNumber(txnNum); + txnRouter.stash(operationContext(), TransactionRouter::StashReason::kYield); txnRouter.unstash(operationContext()); } TEST_F(TransactionRouterTestWithDefaultSession, AttachTxnValidatesReadConcernIfAlreadyOnCmd) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -778,6 +810,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SameAPIParametersAfterFirstState apiParameters.setAPIVersion("1"); APIParameters::get(operationContext()) = apiParameters; TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -795,6 +828,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, DifferentAPIParametersAfterFirst apiParameters.setAPIVersion("1"); APIParameters::get(operationContext()) = apiParameters; TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -816,6 +850,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NoAPIParametersAfterFirstStateme apiParameters.setAPIVersion("1"); APIParameters::get(operationContext()) = apiParameters; TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -834,6 +869,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NoAPIParametersAfterFirstStateme TEST_F(TransactionRouterTestWithDefaultSession, CannotSpecifyReadConcernAfterFirstStatement) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -851,6 +887,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, PassesThroughEmptyReadConcernToP repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -877,6 +914,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, repl::ReadConcernArgs(kAfterClusterTime, boost::none); TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -902,6 +940,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedReadConcernLeve repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(readConcernLevel); TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( txnRouter.beginOrContinueTxn( @@ -917,6 +956,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter repl::ReadConcernArgs(LogicalTime(Timestamp(10, 1)), readConcernLevel); TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( txnRouter.beginOrContinueTxn( @@ -932,6 +972,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter repl::ReadConcernArgs(repl::OpTime(Timestamp(10, 1), 2), readConcernLevel); TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); ASSERT_THROWS_CODE( txnRouter.beginOrContinueTxn( @@ -943,6 +984,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, RejectUnsupportedLevelsWithAfter TEST_F(TransactionRouterTestWithDefaultSession, CannotCommitWithoutParticipantsOrRecoveryToken) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -985,6 +1027,7 @@ void checkWriteConcern(const BSONObj& cmdObj, const WriteConcernOptions& expecte TEST_F(TransactionRouterTestWithDefaultSession, CommitTransactionWithNoParticipantsDoesNotSendCommit) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1006,6 +1049,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForSingleParticipantThatIsReadOnly) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1042,6 +1086,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForSingleParticipantThatDidAWrite) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1078,6 +1123,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCommitDirectlyForMultipleParticipantsThatAreAllReadOnly) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1128,6 +1174,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCoordinateCommitForMultipleParticipantsOnlyOneDidAWrite) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1176,6 +1223,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SendCoordinateCommitForMultipleParticipantsMoreThanOneDidAWrite) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1291,6 +1339,7 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithNoParticipants) { TEST_F(TransactionRouterTestWithDefaultSession, CrossShardTxnCommitWorksAfterRecoveryCommitForPreviousTransaction) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto opCtx = operationContext(); opCtx->setTxnNumber(txnNum); @@ -1333,6 +1382,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, // should be sent with the correct participant list. { ++txnNum; + operationContext()->setTxnNumber(txnNum); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1380,6 +1430,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, RouterShouldWorkAsRecoveryRouterEvenIfItHasSeenPreviousTransactions) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto opCtx = operationContext(); opCtx->setTxnNumber(txnNum); @@ -1439,7 +1490,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, // participants. { ++txnNum; - + operationContext()->setTxnNumber(txnNum); txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kCommit); TxnRecoveryToken recoveryToken; @@ -1524,6 +1575,7 @@ TEST_F(TransactionRouterTest, CommitWithRecoveryTokenWithUnknownShard) { TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsResetAtClusterTime) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1572,6 +1624,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsResetAtClusterTime TEST_F(TransactionRouterTestWithDefaultSession, CannotChangeAtClusterTimeAfterStatementThatSelectedIt) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1633,6 +1686,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipants) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1683,6 +1737,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, SnapshotErrorsClearsAllParticipa TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAfterFirstCommand) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1706,6 +1761,8 @@ TEST_F(TransactionRouterTestWithDefaultSession, CannotContinueOnSnapshotErrorAft TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreatedAt) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); + auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( operationContext(), txnNum, TransactionRouter::TransactionActions::kStart); @@ -1738,6 +1795,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(repl::ReadConcernLevel::kSnapshotReadConcern); TxnNumber txnNum2{5}; + operationContext()->setTxnNumber(txnNum2); txnRouter.beginOrContinueTxn( operationContext(), txnNum2, TransactionRouter::TransactionActions::kStart); txnRouter.setDefaultAtClusterTime(operationContext()); @@ -1759,6 +1817,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, ParticipantsRememberStmtIdCreate TEST_F(TransactionRouterTestWithDefaultSession, AllParticipantsAndCoordinatorClearedOnStaleErrorOnFirstCommand) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1804,6 +1863,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClearedOnStaleError) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1848,6 +1908,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, OnlyNewlyCreatedParticipantsClea TEST_F(TransactionRouterTestWithDefaultSession, RetriesCannotPickNewAtClusterTimeOnStatementAfterSelected) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -1901,6 +1962,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, WritesCanOnlyBeRetriedIfFirstOve auto otherCmds = {"find", "distinct", "aggregate", "killCursors", "getMore"}; TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2127,6 +2189,7 @@ TEST_F(TransactionRouterTest, AbortForMultipleParticipantsSomeReturnNetworkError TEST_F(TransactionRouterTestWithDefaultSession, OnViewResolutionErrorClearsAllNewParticipants) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2304,6 +2367,7 @@ TEST_F(TransactionRouterTest, ImplicitAbortIgnoresErrors) { TEST_F(TransactionRouterTestWithDefaultSession, AbortPropagatesWriteConcern) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto opCtx = operationContext(); auto txnRouter = TransactionRouter::get(opCtx); @@ -2332,6 +2396,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, AbortPropagatesWriteConcern) { TEST_F(TransactionRouterTestWithDefaultSession, ContinueOnlyOnStaleVersionOnFirstOp) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2384,6 +2449,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinueOnlyOnStaleVersionOnFirs TEST_F(TransactionRouterTestWithDefaultSession, ContinuingTransactionPlacesItsReadConcernOnOpCtx) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2401,6 +2467,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, ContinuingTransactionPlacesItsRe TEST_F(TransactionRouterTestWithDefaultSession, SubsequentStatementCanSelectAtClusterTimeIfNotSelectedYet) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2547,6 +2614,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, NonSnapshotReadConcernLevelsPres TEST_F(TransactionRouterTestWithDefaultSession, AbortBetweenStatementRetriesIgnoresNoSuchTransaction) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2577,6 +2645,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, AbortBetweenStatementRetriesUsesIdempotentRetryPolicy) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2609,6 +2678,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, AbortBetweenStatementRetriesFailsWithNoSuchTransactionOnUnexpectedErrors) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2641,6 +2711,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseInvariantsIfParticipantDoesNotExist, "Participant should exist if processing participant response") { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2658,6 +2729,7 @@ DEATH_TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseDoesNotUpdateParticipantIfResponseStatusIsNotOk) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2673,6 +2745,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseMarksParticipantAsReadOnlyIfResponseSaysReadOnlyTrue) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2698,6 +2771,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseMarksParticipantAsNotReadOnlyIfFirstResponseSaysReadOnlyFalse) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2723,6 +2797,7 @@ TEST_F( TransactionRouterTestWithDefaultSession, ProcessParticipantResponseUpdatesParticipantToReadOnlyFalseIfLaterResponseSaysReadOnlyFalse) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2749,6 +2824,7 @@ TEST_F( TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseThrowsIfParticipantClaimsToChangeFromReadOnlyFalseToReadOnlyTrue) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2774,6 +2850,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantResponseThrowsIfParticipantReturnsErrorThenSuccessOnLaterStatement) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto txnRouter = TransactionRouter::get(operationContext()); txnRouter.beginOrContinueTxn( @@ -2811,6 +2888,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantSkipsValidationIfAbortAlreadyInitiated) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto opCtx = operationContext(); auto txnRouter = TransactionRouter::get(opCtx); @@ -2837,6 +2915,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantSkipsValidationIfImplicitAbortAlreadyInitiated) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto opCtx = operationContext(); auto txnRouter = TransactionRouter::get(opCtx); @@ -2859,6 +2938,7 @@ TEST_F(TransactionRouterTestWithDefaultSession, TEST_F(TransactionRouterTestWithDefaultSession, ProcessParticipantSkipsValidationIfCommitAlreadyInitiated) { TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); auto opCtx = operationContext(); auto txnRouter = TransactionRouter::get(opCtx); @@ -2884,6 +2964,111 @@ TEST_F(TransactionRouterTestWithDefaultSession, txnRouter.processParticipantResponse(operationContext(), shard1, kOkReadOnlyTrueResponse); } +TEST_F(TransactionRouterTestWithDefaultSession, CannotAdvanceTxnNumberWithActiveYielder) { + TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); + auto opCtx = operationContext(); + + auto txnRouter = TransactionRouter::get(opCtx); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + + txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield); + + // We can beginOrContinueTxn at the active txnNumber. This simulates a yielded session being + // checked out by a different operation for that same transaction. + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kContinue); + + txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield); + + // A higher txnNumber cannot be used. + ASSERT_THROWS_CODE(txnRouter.beginOrContinueTxn( + opCtx, txnNum + 1, TransactionRouter::TransactionActions::kStart), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); + + ASSERT_THROWS_CODE(txnRouter.beginOrContinueTxn( + opCtx, txnNum + 1, TransactionRouter::TransactionActions::kCommit), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); + + TransactionRouter::get(opCtx).unstash(opCtx); + + // A higher txnNumber still cannot be used because there is an outstanding yielder. + ASSERT_THROWS_CODE(txnRouter.beginOrContinueTxn( + opCtx, txnNum + 1, TransactionRouter::TransactionActions::kStart), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); + + ASSERT_THROWS_CODE(txnRouter.beginOrContinueTxn( + opCtx, txnNum + 1, TransactionRouter::TransactionActions::kCommit), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); + + TransactionRouter::get(opCtx).unstash(opCtx); + + // A non-yielding stash does not affect whether a new transaction can begin. This simulates an + // operation completing. Stash multiple times to verify it does not bring the activeYields + // counter below 0 and trigger a crash. + txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone); + txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone); + txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone); + + // Now the router can advance. + txnRouter.beginOrContinueTxn(opCtx, txnNum + 1, TransactionRouter::TransactionActions::kStart); +} + +DEATH_TEST_F(TransactionRouterTestWithDefaultSession, + ActiveYieldersCannotGoBelowZero, + "Invalid activeYields") { + TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); + auto opCtx = operationContext(); + + auto txnRouter = TransactionRouter::get(opCtx); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + + txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield); + txnRouter.unstash(opCtx); + txnRouter.unstash(opCtx); +} + +TEST_F(TransactionRouterTestWithDefaultSession, CannotBeReapedWithActiveYielders) { + TxnNumber txnNum{3}; + operationContext()->setTxnNumber(txnNum); + auto opCtx = operationContext(); + + auto txnRouter = TransactionRouter::get(opCtx); + + // Can be reaped initially. + ASSERT(txnRouter.canBeReaped()); + + // Can be reaped after a non-yield stash. + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kStart); + txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone); + ASSERT(txnRouter.canBeReaped()); + + // Cannot be reaped after a yield stash. + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield); + ASSERT_FALSE(txnRouter.canBeReaped()); + + // Cannot be reaped after multiple yield stashes until all corresponding unstashes. + repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + txnRouter.beginOrContinueTxn(opCtx, txnNum, TransactionRouter::TransactionActions::kContinue); + txnRouter.stash(opCtx, TransactionRouter::StashReason::kYield); + ASSERT_FALSE(txnRouter.canBeReaped()); + + txnRouter.unstash(opCtx); + txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone); + ASSERT_FALSE(txnRouter.canBeReaped()); + + txnRouter.unstash(opCtx); + txnRouter.stash(opCtx, TransactionRouter::StashReason::kDone); + ASSERT(txnRouter.canBeReaped()); +} + // Begins a transaction with snapshot level read concern and sets a default cluster time. class TransactionRouterTestWithDefaultSessionAndStartedSnapshot : public TransactionRouterTestWithDefaultSession { @@ -2951,6 +3136,8 @@ protected: void setUp() override { TransactionRouterTestWithDefaultSession::setUp(); repl::ReadConcernArgs::get(operationContext()) = repl::ReadConcernArgs(); + _txnRouter = TransactionRouter::get(operationContext()); + operationContext()->setTxnNumber(kTxnNumber); } TickSourceMock* tickSource() { @@ -2965,8 +3152,9 @@ protected: return dynamic_cast(getServiceContext()->getPreciseClockSource()); } - TransactionRouter::Router txnRouter() { - return TransactionRouter::get(operationContext()); + TransactionRouter::Router& txnRouter() { + invariant(_txnRouter); + return *_txnRouter; } void beginTxnWithDefaultTxnNumber() { @@ -3221,7 +3409,7 @@ protected: assertTimeActiveIs(Microseconds(timeActive)); assertTimeInactiveIs(Microseconds(timeInactive)); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); tickSource()->advance(Microseconds(150)); assertTimeActiveIs(Microseconds(timeActive)); @@ -3233,12 +3421,15 @@ protected: assertTimeActiveIs(kDefaultTimeActive); assertTimeInactiveIs(Microseconds(0)); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); tickSource()->advance(kDefaultTimeInactive); assertTimeActiveIs(kDefaultTimeActive); assertTimeInactiveIs(kDefaultTimeInactive); } + +private: + boost::optional _txnRouter; }; // @@ -3404,7 +3595,7 @@ TEST_F(TransactionRouterMetricsTest, SlowLoggingPrintsTimeActiveAndInactive) { tickSource()->advance(Microseconds(111111)); assertTimeActiveIs(Microseconds(111111)); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); tickSource()->advance(Microseconds(222222)); assertTimeInactiveIs(Microseconds(222222)); @@ -3926,6 +4117,7 @@ TEST_F(TransactionRouterMetricsTest, CommitDurationResetByNewTransaction) { future.default_timed_get(); // Start a new transaction and verify the commit duration was reset. + operationContext()->setTxnNumber(kTxnNumber + 1); txnRouter().beginOrContinueTxn( operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); @@ -4050,7 +4242,7 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceSeparatelyAndSu assertDurationIs(Microseconds(50)); // Only timeInactive will advance while a txn is stashed. - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); tickSource()->advance(Microseconds(100)); assertTimeActiveIs(Microseconds(50)); assertTimeInactiveIs(Microseconds(100)); @@ -4080,7 +4272,7 @@ TEST_F(TransactionRouterMetricsTest, StashIsIdempotent) { assertTimeActiveIs(kDefaultTimeActive); assertTimeInactiveIs(kDefaultTimeInactive); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive()); // Only timeInactive can advance. @@ -4088,7 +4280,7 @@ TEST_F(TransactionRouterMetricsTest, StashIsIdempotent) { assertTimeActiveIs(kDefaultTimeActive); assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100)); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive()); // Still only timeInactive can advance. @@ -4106,7 +4298,7 @@ TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedStashedTxn) { assertTimeInactiveIs(kDefaultTimeInactive); assertDurationIs(kDefaultTimeActive + kDefaultTimeInactive); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); // At shutdown transactions are implicitly aborted without being continued so a transaction may // be stashed when aborting, which should still lead to durations in a consistent state. @@ -4153,7 +4345,7 @@ TEST_F(TransactionRouterMetricsTest, DurationsForImplicitlyAbortedEndedTxn) { txnRouter().beginOrContinueTxn( operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kCommit); runCommit(kDummyOkRes); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); // At shutdown transactions are implicitly aborted without being continued, so an "ended" // transaction (i.e. committed or aborted) may be implicitly aborted again. This shouldn't @@ -4211,7 +4403,7 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom assertTimeInactiveIs(kDefaultTimeInactive); // timeInactive can advance. - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); tickSource()->advance(Microseconds(100)); assertTimeActiveIs(kDefaultTimeActive + Microseconds(100)); assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100)); @@ -4226,7 +4418,7 @@ TEST_F(TransactionRouterMetricsTest, TimeActiveAndInactiveAdvanceAfterUnknownCom assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(100)); // timeInactive can advance. - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); tickSource()->advance(Microseconds(100)); assertTimeActiveIs(kDefaultTimeActive + Microseconds(200)); assertTimeInactiveIs(kDefaultTimeInactive + Microseconds(200)); @@ -4293,7 +4485,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginRecover) { TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_Stash) { beginRecoverCommitWithDefaultTxnNumber(); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive()); @@ -4301,7 +4493,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_Stash) { TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAfterStash) { beginRecoverCommitWithDefaultTxnNumber(); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); txnRouter().beginOrContinueTxn( operationContext(), kTxnNumber, TransactionRouter::TransactionActions::kContinue); @@ -4326,14 +4518,14 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_AreNotCumulative) { // Test inactive. txnRouter().beginOrContinueTxn( operationContext(), kTxnNumber + 2, TransactionRouter::TransactionActions::kStart); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive()); txnRouter().beginOrContinueTxn( operationContext(), kTxnNumber + 3, TransactionRouter::TransactionActions::kStart); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive()); @@ -4357,7 +4549,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_UnknownCommit) { TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_ImplicitAbortForStashedTxn) { beginTxnWithDefaultTxnNumber(); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); ASSERT_EQUALS(1L, routerTxnMetrics()->getCurrentInactive()); @@ -4396,7 +4588,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCurrent_BeginAndStashForEndedT ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentInactive()); - txnRouter().stash(operationContext()); + txnRouter().stash(operationContext(), TransactionRouter::StashReason::kDone); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentOpen()); ASSERT_EQUALS(0L, routerTxnMetrics()->getCurrentActive()); @@ -4567,6 +4759,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalStarted_IsCumulative) { beginTxnWithDefaultTxnNumber(); ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalStarted()); + operationContext()->setTxnNumber(kTxnNumber + 1); txnRouter().beginOrContinueTxn( operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalStarted()); @@ -4635,6 +4828,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalCommitted_IsCumulative) { runCommit(kDummyOkRes); ASSERT_EQUALS(1L, routerTxnMetrics()->getTotalCommitted()); + operationContext()->setTxnNumber(kTxnNumber + 1); txnRouter().beginOrContinueTxn( operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); runCommit(kDummyOkRes); @@ -4811,6 +5005,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsTotalParticipantsAtCommit) { ASSERT_EQUALS(2L, routerTxnMetrics()->getTotalParticipantsAtCommit()); // Is cumulative across transactions. + operationContext()->setTxnNumber(kTxnNumber + 1); txnRouter().beginOrContinueTxn( operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); txnRouter().attachTxnFieldsIfNeeded(operationContext(), shard1, {}); @@ -4893,6 +5088,7 @@ TEST_F(TransactionRouterMetricsTest, RouterMetricsCommitTypeStatsSuccessfulDurat .successfulDurationMicros.load()); // Start a new transaction and verify that successful commit duration is cumulative. + operationContext()->setTxnNumber(kTxnNumber + 1); txnRouter().beginOrContinueTxn( operationContext(), kTxnNumber + 1, TransactionRouter::TransactionActions::kStart); txnRouter().setDefaultAtClusterTime(operationContext()); -- cgit v1.2.1