diff options
-rw-r--r-- | src/mongo/db/SConscript | 28 | ||||
-rw-r--r-- | src/mongo/db/kill_sessions_local.cpp | 94 | ||||
-rw-r--r-- | src/mongo/db/kill_sessions_local.h | 3 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl.cpp | 14 | ||||
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 108 | ||||
-rw-r--r-- | src/mongo/db/pipeline/process_interface_standalone.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_state_transition_lock_guard.cpp | 3 | ||||
-rw-r--r-- | src/mongo/db/repl/rs_rollback.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/session.h | 38 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 98 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.h | 48 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_mongod.cpp | 55 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_mongod.h | 13 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_test.cpp | 245 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 24 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 7 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 5 |
18 files changed, 646 insertions, 171 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 44f61615274..580b995e181 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -637,6 +637,21 @@ env.Library( ) env.Library( + target='session_catalog', + source=[ + 'session_catalog.cpp', + 'session.cpp', + ], + LIBDEPS=[ + 'kill_sessions', + 'logical_session_id', + ], + LIBDEPS_PRIVATE=[ + 'service_context', + ] +) + +env.Library( target='catalog_raii', source=[ 'catalog_raii.cpp', @@ -644,8 +659,6 @@ env.Library( 'retryable_writes_stats.cpp', 'server_transactions_metrics.cpp', 'session_catalog_mongod.cpp', - 'session_catalog.cpp', - 'session.cpp', 'single_transaction_stats.cpp', 'transaction_coordinator_factory.cpp', 'transaction_history_iterator.cpp', @@ -668,15 +681,14 @@ env.Library( 'curop_metrics', 'dbdirectclient', 'index/index_access_method', - 'kill_sessions', - 'logical_session_id', 'namespace_string', 'repl/oplog_entry', 'repl/oplog_shim', 's/sharding_api_d', + 'session_catalog', 'stats/fill_locker_info', + 'stats/top', 'views/views', - "stats/top", ], LIBDEPS_PRIVATE=[ 'commands/server_status', @@ -1855,8 +1867,6 @@ env.Library( 'service_context_devnull_test_fixture.cpp', ], LIBDEPS=[ - # this library is required only because of SERVER-29908 - '$BUILD_DIR/mongo/db/s/sharding_runtime_d', 'service_context_d_test_fixture', ], LIBDEPS_PRIVATE=[ @@ -1967,8 +1977,8 @@ env.CppUnitTest( ], LIBDEPS=[ 'auth/authmocks', - 'catalog_raii', - 'repl/mock_repl_coord_server_fixture', + 'service_context_d_test_fixture', + 'session_catalog', ], ) diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp index d416b328552..2ea7b2b6f1d 100644 --- a/src/mongo/db/kill_sessions_local.cpp +++ b/src/mongo/db/kill_sessions_local.cpp @@ -46,29 +46,54 @@ namespace mongo { namespace { +/** + * Shortcut method shared by the various forms of session kill below. Every session kill operation + * consists of the following stages: + * 1) Select the sessions to kill, based on their lsid or owning user account (achieved through the + * 'matcher') and further refining that list through the 'filterFn'. + * 2) If any of the selected sessions are currently checked out, interrupt the owning operation + * context with 'reason' as the code. + * 3) Finish killing the selected and interrupted sessions through the 'killSessionFn'. + */ void killSessionsAction(OperationContext* opCtx, const SessionKiller::Matcher& matcher, - const SessionCatalog::ScanSessionsCallbackFn& killSessionFn) { + const stdx::function<bool(Session*)>& filterFn, + const stdx::function<void(Session*)>& killSessionFn, + ErrorCodes::Error reason = ErrorCodes::Interrupted) { const auto catalog = SessionCatalog::get(opCtx); - catalog->scanSessions(matcher, [&](Session* session) { + std::vector<Session::KillToken> sessionKillTokens; + catalog->scanSessions(matcher, [&](WithLock sessionCatalogLock, Session* session) { + if (filterFn(session)) + sessionKillTokens.emplace_back(session->kill(sessionCatalogLock, reason)); + }); + + for (auto& sessionKillToken : sessionKillTokens) { + auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken)); + // TODO (SERVER-33850): Rename KillAllSessionsByPattern and // ScopedKillAllSessionsByPatternImpersonator to not refer to session kill const KillAllSessionsByPattern* pattern = matcher.match(session->getSessionId()); invariant(pattern); ScopedKillAllSessionsByPatternImpersonator impersonator(opCtx, *pattern); - killSessionFn(session); - }); + killSessionFn(session.get()); + } } } // namespace void killSessionsLocalKillTransactions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) { - killSessionsAction(opCtx, matcher, [](Session* session) { - TransactionParticipant::getFromNonCheckedOutSession(session)->abortArbitraryTransaction(); - }); + const SessionKiller::Matcher& matcher, + ErrorCodes::Error reason) { + killSessionsAction(opCtx, + matcher, + [](Session*) { return true; }, + [](Session* session) { + TransactionParticipant::getFromNonCheckedOutSession(session) + ->abortArbitraryTransaction(); + }, + reason); } SessionKiller::Result killSessionsLocal(OperationContext* opCtx, @@ -86,23 +111,52 @@ SessionKiller::Result killSessionsLocal(OperationContext* opCtx, void killAllExpiredTransactions(OperationContext* opCtx) { SessionKiller::Matcher matcherAllSessions( KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); - killSessionsAction(opCtx, matcherAllSessions, [](Session* session) { - try { - TransactionParticipant::getFromNonCheckedOutSession(session) - ->abortArbitraryTransactionIfExpired(); - } catch (const DBException& ex) { - warning() << "May have failed to abort expired transaction on session " - << session->getSessionId().getId() << " due to " << redact(ex.toStatus()); - } - }); + killSessionsAction( + opCtx, + matcherAllSessions, + [](Session* session) { + const auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session); + + return txnParticipant->expired(); + }, + [](Session* session) { + const auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session); + + LOG(0) + << "Aborting transaction with txnNumber " << txnParticipant->getActiveTxnNumber() + << " on session " << session->getSessionId().getId() + << " because it has been running for longer than 'transactionLifetimeLimitSeconds'"; + + // The try/catch block below is necessary because expired() in the filterFn above could + // return true for expired, but unprepared transaction, but by the time we get to + // actually kill it, the participant could theoretically become prepared (being under + // the SessionCatalog mutex doesn't prevent the concurrently running thread from doing + // preparing the participant). + // + // Then when the execution reaches the killSessionFn, it would find the transaction is + // prepared and not allowed to be killed, which would cause the exception below + try { + txnParticipant->abortArbitraryTransaction(); + } catch (const DBException& ex) { + warning() << "May have failed to abort expired transaction on session " + << session->getSessionId().getId() << " due to " << redact(ex.toStatus()); + } + }, + ErrorCodes::ExceededTimeLimit); } void killSessionsLocalShutdownAllTransactions(OperationContext* opCtx) { SessionKiller::Matcher matcherAllSessions( KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); - killSessionsAction(opCtx, matcherAllSessions, [](Session* session) { - TransactionParticipant::getFromNonCheckedOutSession(session)->shutdown(); - }); + killSessionsAction(opCtx, + matcherAllSessions, + [](Session*) { return true; }, + [](Session* session) { + TransactionParticipant::getFromNonCheckedOutSession(session)->shutdown(); + }, + ErrorCodes::InterruptedAtShutdown); } } // namespace mongo diff --git a/src/mongo/db/kill_sessions_local.h b/src/mongo/db/kill_sessions_local.h index 3d6f0699cba..60cb150904a 100644 --- a/src/mongo/db/kill_sessions_local.h +++ b/src/mongo/db/kill_sessions_local.h @@ -48,7 +48,8 @@ SessionKiller::Result killSessionsLocal(OperationContext* opCtx, * Kills all transactions on mongod for sessions matching 'matcher'. */ void killSessionsLocalKillTransactions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher); + const SessionKiller::Matcher& matcher, + ErrorCodes::Error reason = ErrorCodes::Interrupted); /** * Aborts any expired transactions. diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp index a295be04d5a..1ccf72f16da 100644 --- a/src/mongo/db/op_observer_impl.cpp +++ b/src/mongo/db/op_observer_impl.cpp @@ -53,7 +53,7 @@ #include "mongo/db/repl/replication_coordinator.h" #include "mongo/db/s/collection_sharding_state.h" #include "mongo/db/server_options.h" -#include "mongo/db/session_catalog.h" +#include "mongo/db/session_catalog_mongod.h" #include "mongo/db/transaction_participant.h" #include "mongo/db/views/durable_view_catalog.h" #include "mongo/scripting/engine.h" @@ -435,7 +435,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx, } } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) { for (auto it = first; it != last; it++) { - SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc); + MongoDSessionCatalog::invalidateSessions(opCtx, it->doc); } } } @@ -504,7 +504,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updateArgs.updatedDoc); } else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.writeOpTime.isNull()) { - SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updateArgs.updatedDoc); + MongoDSessionCatalog::invalidateSessions(opCtx, args.updateArgs.updatedDoc); } } @@ -570,7 +570,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx, uasserted(40670, "removing FeatureCompatibilityVersion document is not allowed"); } else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !opTime.writeOpTime.isNull()) { - SessionCatalog::get(opCtx)->invalidateSessions(opCtx, documentKey); + MongoDSessionCatalog::invalidateSessions(opCtx, documentKey); } } @@ -730,7 +730,7 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string& 50714, "dropping the admin database is not allowed.", dbName != NamespaceString::kAdminDb); if (dbName == NamespaceString::kSessionTransactionsTableNamespace.db()) { - SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); + MongoDSessionCatalog::invalidateSessions(opCtx, boost::none); } NamespaceUUIDCache::get(opCtx).evictNamespacesInDatabase(dbName); @@ -770,7 +770,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx, if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) { DurableViewCatalog::onExternalChange(opCtx, collectionName); } else if (collectionName == NamespaceString::kSessionTransactionsTableNamespace) { - SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); + MongoDSessionCatalog::invalidateSessions(opCtx, boost::none); } AuthorizationManager::get(opCtx->getServiceContext()) @@ -1145,7 +1145,7 @@ void OpObserverImpl::onReplicationRollback(OperationContext* opCtx, // If there were ops rolled back that were part of operations on a session, then invalidate // the session cache. if (rbInfo.rollbackSessionIds.size() > 0) { - SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); + MongoDSessionCatalog::invalidateSessions(opCtx, boost::none); } // Reset the key manager cache. diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index abbb9b94754..dd4cbc4b021 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -345,62 +345,106 @@ public: } }; -TEST_F(OpObserverSessionCatalogTest, OnRollbackInvalidatesSessionCatalogIfSessionOpsRolledBack) { - OpObserverImpl opObserver; - auto opCtx = cc().makeOperationContext(); +using OpObserverSessionCatalogRollbackTest = OpObserverSessionCatalogTest; + +TEST_F(OpObserverSessionCatalogRollbackTest, + OnRollbackInvalidatesSessionCatalogIfSessionOpsRolledBack) { const NamespaceString nss("testDB", "testColl"); // Create a session. auto sessionCatalog = SessionCatalog::get(getServiceContext()); auto sessionId = makeLogicalSessionIdForTest(); - auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId); - const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); - txnParticipant->refreshFromStorageIfNeeded(opCtx.get()); - // Simulate a write occurring on that session. const TxnNumber txnNum = 0; const StmtId stmtId = 1000; - simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId); - // Check that the statement executed. - ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); + { + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(sessionId); + + // Create a session and sync it from disk + auto session = sessionCatalog->checkOutSession(opCtx.get()); + const auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session.get()); + txnParticipant->refreshFromStorageIfNeeded(opCtx.get()); + + // Simulate a write occurring on that session + simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId); + + // Check that the statement executed + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); + } // The OpObserver should invalidate in-memory session state, so the check after this should // fail. - OpObserver::RollbackObserverInfo rbInfo; - rbInfo.rollbackSessionIds = {UUID::gen()}; - opObserver.onReplicationRollback(opCtx.get(), rbInfo); - ASSERT_THROWS_CODE(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId), - DBException, - ErrorCodes::ConflictingOperationInProgress); + { + auto opCtx = cc().makeOperationContext(); + + OpObserverImpl opObserver; + OpObserver::RollbackObserverInfo rbInfo; + rbInfo.rollbackSessionIds = {UUID::gen()}; + opObserver.onReplicationRollback(opCtx.get(), rbInfo); + } + + { + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(sessionId); + + auto session = sessionCatalog->checkOutSession(opCtx.get()); + const auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session.get()); + ASSERT_THROWS_CODE(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId), + DBException, + ErrorCodes::ConflictingOperationInProgress); + } } -TEST_F(OpObserverSessionCatalogTest, +TEST_F(OpObserverSessionCatalogRollbackTest, OnRollbackDoesntInvalidateSessionCatalogIfNoSessionOpsRolledBack) { - OpObserverImpl opObserver; - auto opCtx = cc().makeOperationContext(); const NamespaceString nss("testDB", "testColl"); - // Create a session. auto sessionCatalog = SessionCatalog::get(getServiceContext()); auto sessionId = makeLogicalSessionIdForTest(); - auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId); - const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); - txnParticipant->refreshFromStorageIfNeeded(opCtx.get()); - // Simulate a write occurring on that session. const TxnNumber txnNum = 0; const StmtId stmtId = 1000; - simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId); - // Check that the statement executed. - ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); + { + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(sessionId); + + // Create a session and sync it from disk + auto session = sessionCatalog->checkOutSession(opCtx.get()); + const auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session.get()); + txnParticipant->refreshFromStorageIfNeeded(opCtx.get()); - // The OpObserver should not invalidate the in-memory session state, so the check after this - // should still succeed. - OpObserver::RollbackObserverInfo rbInfo; - opObserver.onReplicationRollback(opCtx.get(), rbInfo); - ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); + // Simulate a write occurring on that session + simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId); + + // Check that the statement executed + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); + } + + // Because there are no sessions to rollback, the OpObserver should not invalidate the in-memory + // session state, so the check after this should still succeed. + { + auto opCtx = cc().makeOperationContext(); + + OpObserverImpl opObserver; + OpObserver::RollbackObserverInfo rbInfo; + opObserver.onReplicationRollback(opCtx.get(), rbInfo); + } + + { + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(sessionId); + + auto session = sessionCatalog->checkOutSession(opCtx.get()); + const auto txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session.get()); + ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId)); + } } /** diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp index 75fb80d5183..660ee570b43 100644 --- a/src/mongo/db/pipeline/process_interface_standalone.cpp +++ b/src/mongo/db/pipeline/process_interface_standalone.cpp @@ -495,7 +495,7 @@ void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext sessionCatalog->scanSessions( {std::move(sessionFilter)}, - [&](Session* session) { + [&](WithLock sessionCatalogLock, Session* session) { auto op = TransactionParticipant::getFromNonCheckedOutSession(session)->reportStashedState(); if (!op.isEmpty()) { diff --git a/src/mongo/db/repl/replication_state_transition_lock_guard.cpp b/src/mongo/db/repl/replication_state_transition_lock_guard.cpp index b78fb7f290e..5e557b81857 100644 --- a/src/mongo/db/repl/replication_state_transition_lock_guard.cpp +++ b/src/mongo/db/repl/replication_state_transition_lock_guard.cpp @@ -55,7 +55,8 @@ ReplicationStateTransitionLockGuard::ReplicationStateTransitionLockGuard(Operati // Destroy all stashed transaction resources, in order to release locks. SessionKiller::Matcher matcherAllSessions( KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); - killSessionsLocalKillTransactions(opCtx, matcherAllSessions); + killSessionsLocalKillTransactions( + opCtx, matcherAllSessions, ErrorCodes::InterruptedDueToStepDown); } _globalLock->waitForLockUntil(args.lockDeadline); diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp index 33963224c53..9c76bbf5b18 100644 --- a/src/mongo/db/repl/rs_rollback.cpp +++ b/src/mongo/db/repl/rs_rollback.cpp @@ -1435,7 +1435,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx, // If necessary, clear the memory of existing sessions. if (fixUpInfo.refetchTransactionDocs) { - SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); + MongoDSessionCatalog::invalidateSessions(opCtx, boost::none); } if (auto validator = LogicalTimeValidator::get(opCtx)) { diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index a190f7daf43..3936dac0478 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -32,6 +32,8 @@ #include "mongo/db/session.h" +#include "mongo/db/operation_context.h" + namespace mongo { Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {} @@ -41,6 +43,28 @@ OperationContext* Session::currentOperation() const { return _checkoutOpCtx; } +Session::KillToken Session::kill(WithLock sessionCatalogLock, ErrorCodes::Error reason) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + uassert(ErrorCodes::ConflictingOperationInProgress, "Session already killed", !_killRequested); + _killRequested = true; + + // For currently checked-out sessions, interrupt the operation context so that the current owner + // can release the session + if (_checkoutOpCtx) { + const auto serviceContext = _checkoutOpCtx->getServiceContext(); + + stdx::lock_guard<Client> clientLock(*_checkoutOpCtx->getClient()); + serviceContext->killOperation(_checkoutOpCtx, reason); + } + + return KillToken(getSessionId()); +} + +bool Session::killed() const { + stdx::lock_guard<stdx::mutex> lg(_mutex); + return _killRequested; +} + void Session::_markCheckedOut(WithLock sessionCatalogLock, OperationContext* checkoutOpCtx) { stdx::lock_guard<stdx::mutex> lk(_mutex); invariant(!_checkoutOpCtx); @@ -53,4 +77,10 @@ void Session::_markCheckedIn(WithLock sessionCatalogLock) { _checkoutOpCtx = nullptr; } +void Session::_markNotKilled(WithLock sessionCatalogLock, KillToken killToken) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + invariant(_killRequested); + _killRequested = false; +} + } // namespace mongo diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index e1e95741c11..f7349e0d29e 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -64,6 +64,35 @@ public: */ OperationContext* currentOperation() const; + /** + * Marks the session as killed and returns a 'kill token' to to be passed later on to + * 'checkOutSessionForKill' method of the SessionCatalog in order to permit the caller to + * execute any kill cleanup tasks and pass further on to '_markNotKilled' in order to reset the + * kill state. Marking session as killed is an internal property only that will cause any + * further calls to 'checkOutSession' to block until 'checkOutSessionForKill' is called and the + * returned scoped object destroyed. + * + * If the session is currently checked-out, this method will also interrupt the operation + * context which has it checked-out. + * + * If the session is already killed throws ConflictingOperationInProgress exception. + * + * Must be called under the owning SessionCatalog's lock. + */ + struct KillToken { + KillToken(LogicalSessionId lsid) : lsidToKill(std::move(lsid)) {} + KillToken(KillToken&&) = default; + KillToken& operator=(KillToken&&) = default; + + LogicalSessionId lsidToKill; + }; + KillToken kill(WithLock sessionCatalogLock, ErrorCodes::Error reason = ErrorCodes::Interrupted); + + /** + * Returns whether 'kill' has been called on this session. + */ + bool killed() const; + private: /** * Set/clear the current check-out state of the session by storing the operation which has this @@ -74,6 +103,12 @@ private: void _markCheckedOut(WithLock sessionCatalogLock, OperationContext* checkoutOpCtx); void _markCheckedIn(WithLock sessionCatalogLock); + /** + * Used by the session catalog when checking a session back in after a call to 'kill'. See the + * comments for 'kill for more details. + */ + void _markNotKilled(WithLock sessionCatalogLock, KillToken killToken); + // The id of the session with which this object is associated const LogicalSessionId _sessionId; @@ -87,6 +122,9 @@ private: // A pointer back to the currently running operation on this Session, or nullptr if there // is no operation currently running for the Session. OperationContext* _checkoutOpCtx{nullptr}; + + // Set to true if markKilled has been invoked for this session. + bool _killRequested{false}; }; } // namespace mongo diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index abc5165e5a3..70049de30a4 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -34,12 +34,8 @@ #include "mongo/db/session_catalog.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" -#include "mongo/db/transaction_participant.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" @@ -58,6 +54,7 @@ SessionCatalog::~SessionCatalog() { for (const auto& entry : _sessions) { auto& sri = entry.second; invariant(!sri->session.currentOperation()); + invariant(!sri->session.killed()); } } @@ -88,6 +85,31 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) stdx::unique_lock<stdx::mutex> ul(_mutex); auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid); + // Wait until the session is no longer checked out and until the previously scheduled kill has + // completed + opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&sri]() { + return !sri->session.currentOperation() && !sri->session.killed(); + }); + + sri->session._markCheckedOut(ul, opCtx); + + return ScopedCheckedOutSession( + opCtx, ScopedSession(std::move(sri)), boost::none /* Not checked out for kill */); +} + +ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* opCtx, + Session::KillToken killToken) { + // This method is not supposed to be called with an already checked-out session due to risk of + // deadlock + invariant(!operationSessionDecoration(opCtx)); + invariant(!opCtx->getTxnNumber()); + + const auto lsid = killToken.lsidToKill; + + stdx::unique_lock<stdx::mutex> ul(_mutex); + auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid); + invariant(sri->session.killed()); + // Wait until the session is no longer checked out opCtx->waitForConditionOrInterrupt( sri->availableCondVar, ul, [&sri]() { return !sri->session.currentOperation(); }); @@ -95,7 +117,8 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) invariant(!sri->session.currentOperation()); sri->session._markCheckedOut(ul, opCtx); - return ScopedCheckedOutSession(opCtx, ScopedSession(std::move(sri))); + return ScopedCheckedOutSession( + opCtx, ScopedSession(std::move(sri)), std::move(killToken) /* Checked out for kill */); } ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx, @@ -112,49 +135,6 @@ ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx, return ss; } -void SessionCatalog::invalidateSessions(OperationContext* opCtx, - boost::optional<BSONObj> singleSessionDoc) { - auto replCoord = repl::ReplicationCoordinator::get(opCtx); - bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; - if (isReplSet) { - uassert(40528, - str::stream() << "Direct writes against " - << NamespaceString::kSessionTransactionsTableNamespace.ns() - << " cannot be performed using a transaction or on a session.", - !opCtx->getLogicalSessionId()); - } - - const auto invalidateSessionFn = [&](WithLock, decltype(_sessions)::iterator it) { - auto& sri = it->second; - auto const txnParticipant = - TransactionParticipant::getFromNonCheckedOutSession(&sri->session); - txnParticipant->invalidate(); - - // We cannot remove checked-out sessions from the cache, because operations expect to find - // them there to check back in - if (!sri->session.currentOperation()) { - _sessions.erase(it); - } - }; - - stdx::lock_guard<stdx::mutex> lg(_mutex); - - if (singleSessionDoc) { - const auto lsid = LogicalSessionId::parse(IDLParserErrorContext("lsid"), - singleSessionDoc->getField("_id").Obj()); - - auto it = _sessions.find(lsid); - if (it != _sessions.end()) { - invalidateSessionFn(lg, it); - } - } else { - auto it = _sessions.begin(); - while (it != _sessions.end()) { - invalidateSessionFn(lg, it++); - } - } -} - void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, const ScanSessionsCallbackFn& workerFn) { stdx::lock_guard<stdx::mutex> lg(_mutex); @@ -163,15 +143,22 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, for (auto& sessionEntry : _sessions) { if (matcher.match(sessionEntry.first)) { - workerFn(&sessionEntry.second->session); + workerFn(lg, &sessionEntry.second->session); } } } +Session::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + auto it = _sessions.find(lsid); + uassert(ErrorCodes::NoSuchSession, "Session not found", it != _sessions.end()); + + auto& sri = it->second; + return sri->session.kill(lg); +} + std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreateSessionRuntimeInfo( WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) { - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - auto it = _sessions.find(lsid); if (it == _sessions.end()) { it = _sessions.emplace(lsid, std::make_shared<SessionRuntimeInfo>(lsid)).first; @@ -180,7 +167,8 @@ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreate return it->second; } -void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) { +void SessionCatalog::_releaseSession(const LogicalSessionId& lsid, + boost::optional<Session::KillToken> killToken) { stdx::lock_guard<stdx::mutex> lg(_mutex); auto it = _sessions.find(lsid); @@ -191,6 +179,11 @@ void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) { sri->session._markCheckedIn(lg); sri->availableCondVar.notify_one(); + + if (killToken) { + invariant(sri->session.killed()); + sri->session._markNotKilled(lg, std::move(*killToken)); + } } OperationContextSession::OperationContextSession(OperationContext* opCtx, bool checkOutSession) @@ -236,6 +229,7 @@ OperationContextSession::~OperationContextSession() { // SessionCatalog mutex, and other code may take the Client lock while holding that mutex. stdx::unique_lock<Client> lk(*_opCtx->getClient()); ScopedCheckedOutSession sessionToDelete(std::move(checkedOutSession.get())); + // This destroys the moved-from ScopedCheckedOutSession, and must be done within the client // lock. checkedOutSession = boost::none; diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index b07cf42221d..90f44b3b60c 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -31,6 +31,7 @@ #pragma once #include <boost/optional.hpp> +#include <vector> #include "mongo/base/disallow_copying.h" #include "mongo/db/logical_session_id.h" @@ -87,6 +88,13 @@ public: ScopedCheckedOutSession checkOutSession(OperationContext* opCtx); /** + * See the description of 'Session::kill' for more information on the session kill usage + * pattern. + */ + ScopedCheckedOutSession checkOutSessionForKill(OperationContext* opCtx, + Session::KillToken killToken); + + /** * Returns a reference to the specified cached session regardless of whether it is checked-out * or not. The returned session is not returned checked-out and is allowed to be checked-out * concurrently. @@ -98,30 +106,26 @@ public: ScopedSession getOrCreateSession(OperationContext* opCtx, const LogicalSessionId& lsid); /** - * Callback to be invoked when it is suspected that the on-disk session contents might not be in - * sync with what is in the sessions cache. - * - * If no specific document is available, the method will invalidate all sessions. Otherwise if - * one is avaiable (which is the case for insert/update/delete), it must contain _id field with - * a valid session entry, in which case only that particular session will be invalidated. If the - * _id field is missing or doesn't contain a valid serialization of logical session, the method - * will throw. This prevents invalid entries from making it in the collection. - */ - void invalidateSessions(OperationContext* opCtx, boost::optional<BSONObj> singleSessionDoc); - - /** * Iterates through the SessionCatalog under the SessionCatalog mutex and applies 'workerFn' to * each Session which matches the specified 'matcher'. * * NOTE: Since this method runs with the session catalog mutex, the work done by 'workerFn' is * not allowed to block, perform I/O or acquire any lock manager locks. + * Iterates through the SessionCatalog and applies 'workerFn' to each Session. This locks the + * SessionCatalog. * * TODO SERVER-33850: Take Matcher out of the SessionKiller namespace. */ - using ScanSessionsCallbackFn = stdx::function<void(Session*)>; + using ScanSessionsCallbackFn = stdx::function<void(WithLock, Session*)>; void scanSessions(const SessionKiller::Matcher& matcher, const ScanSessionsCallbackFn& workerFn); + /** + * Shortcut to invoke 'kill' on the specified session under the SessionCatalog mutex. Throws a + * NoSuchSession exception if the session doesn't exist. + */ + Session::KillToken killSession(const LogicalSessionId& lsid); + private: struct SessionRuntimeInfo { SessionRuntimeInfo(LogicalSessionId lsid) : session(std::move(lsid)) {} @@ -146,7 +150,8 @@ private: /** * Makes a session, previously checked out through 'checkoutSession', available again. */ - void _releaseSession(const LogicalSessionId& lsid); + void _releaseSession(const LogicalSessionId& lsid, + boost::optional<Session::KillToken> killToken); stdx::mutex _mutex; @@ -192,13 +197,16 @@ class ScopedCheckedOutSession { MONGO_DISALLOW_COPYING(ScopedCheckedOutSession); friend ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext*); + friend ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext*, + Session::KillToken); public: ScopedCheckedOutSession(ScopedCheckedOutSession&&) = default; ~ScopedCheckedOutSession() { if (_scopedSession) { - SessionCatalog::get(_opCtx)->_releaseSession(_scopedSession->getSessionId()); + SessionCatalog::get(_opCtx)->_releaseSession(_scopedSession->getSessionId(), + std::move(_killToken)); } } @@ -219,11 +227,17 @@ public: } private: - ScopedCheckedOutSession(OperationContext* opCtx, ScopedSession scopedSession) - : _opCtx(opCtx), _scopedSession(std::move(scopedSession)) {} + ScopedCheckedOutSession(OperationContext* opCtx, + ScopedSession scopedSession, + boost::optional<Session::KillToken> killToken) + : _opCtx(opCtx), + _killToken(std::move(killToken)), + _scopedSession(std::move(scopedSession)) {} OperationContext* const _opCtx; + boost::optional<Session::KillToken> _killToken; + ScopedSession _scopedSession; }; diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 5ae981e97b4..c17c75aa6a3 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -38,12 +38,15 @@ #include "mongo/db/client.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/namespace_string.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/service_context.h" +#include "mongo/db/transaction_participant.h" #include "mongo/rpc/get_status_from_command_result.h" namespace mongo { void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) { - SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none); + invalidateSessions(opCtx, boost::none); const size_t initialExtentSize = 0; const bool capped = false; @@ -85,4 +88,54 @@ boost::optional<UUID> MongoDSessionCatalog::getTransactionTableUUID(OperationCon return coll->uuid(); } +void MongoDSessionCatalog::invalidateSessions(OperationContext* opCtx, + boost::optional<BSONObj> singleSessionDoc) { + const auto replCoord = repl::ReplicationCoordinator::get(opCtx); + bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet; + if (isReplSet) { + uassert(40528, + str::stream() << "Direct writes against " + << NamespaceString::kSessionTransactionsTableNamespace.ns() + << " cannot be performed using a transaction or on a session.", + !opCtx->getLogicalSessionId()); + } + + const auto catalog = SessionCatalog::get(opCtx); + + std::vector<Session::KillToken> sessionKillTokens; + + if (singleSessionDoc) { + sessionKillTokens.emplace_back(catalog->killSession(LogicalSessionId::parse( + IDLParserErrorContext("lsid"), singleSessionDoc->getField("_id").Obj()))); + } else { + SessionKiller::Matcher matcher( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}); + catalog->scanSessions(matcher, + [&sessionKillTokens](WithLock sessionCatalogLock, Session* session) { + sessionKillTokens.emplace_back(session->kill(sessionCatalogLock)); + }); + } + + if (sessionKillTokens.empty()) + return; + + stdx::thread([ + service = opCtx->getServiceContext(), + sessionKillTokens = std::move(sessionKillTokens) + ]() mutable { + auto uniqueClient = service->makeClient("Session catalog kill"); + auto uniqueOpCtx = uniqueClient->makeOperationContext(); + const auto opCtx = uniqueOpCtx.get(); + const auto catalog = SessionCatalog::get(opCtx); + + for (auto& sessionKillToken : sessionKillTokens) { + auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken)); + + auto const txnParticipant = + TransactionParticipant::getFromNonCheckedOutSession(session.get()); + txnParticipant->invalidate(); + } + }).detach(); +} + } // namespace mongo diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h index fc89f3ef0c9..4496c8b17f3 100644 --- a/src/mongo/db/session_catalog_mongod.h +++ b/src/mongo/db/session_catalog_mongod.h @@ -50,6 +50,19 @@ public: * Required for rollback via refetch. */ static boost::optional<UUID> getTransactionTableUUID(OperationContext* opCtx); + + /** + * Callback to be invoked when it is suspected that the on-disk session contents might not be in + * sync with what is in the sessions cache. + * + * If no specific document is available, the method will invalidate all sessions. Otherwise if + * one is avaiable (which is the case for insert/update/delete), it must contain _id field with + * a valid session entry, in which case only that particular session will be invalidated. If the + * _id field is missing or doesn't contain a valid serialization of logical session, the method + * will throw. This prevents invalid entries from making it in the collection. + */ + static void invalidateSessions(OperationContext* opCtx, + boost::optional<BSONObj> singleSessionDoc); }; } // namespace mongo diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index c4b78ce0180..09f28e5e6c8 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -174,7 +174,9 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) { TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { std::vector<LogicalSessionId> lsids; - auto workerFn = [&](Session* session) { lsids.push_back(session->getSessionId()); }; + const auto workerFn = [&lsids](WithLock, Session* session) { + lsids.push_back(session->getSessionId()); + }; // Scan over zero Sessions. SessionKiller::Matcher matcherAllSessions( @@ -206,5 +208,246 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { ASSERT_EQ(lsids.front(), lsid2); } +TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) { + const auto lsid = makeLogicalSessionIdForTest(); + + // Create the session so there is something to kill + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession unusedOperationContextSession(opCtx.get(), true); + } + + auto killToken = catalog()->killSession(lsid); + + // Make sure that regular session check-out will fail because the session is marked as killed + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get(), true), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } + + // Schedule a separate "regular operation" thread, which will block on checking-out the session, + // which we will use to confirm that session kill completion actually unblocks check-out + auto future = stdx::async(stdx::launch::async, [lsid] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready(); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + + OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + // Make sure that "for kill" session check-out succeeds + { + auto opCtx = makeOperationContext(); + auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); + ASSERT_EQ(opCtx.get(), scopedSession->currentOperation()); + } + + // Make sure that session check-out after kill succeeds again + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession unusedOperationContextSession(opCtx.get(), true); + } + + // Make sure the "regular operation" eventually is able to proceed and use the just killed + // session + future.get(); +} + +TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) { + const auto lsid = makeLogicalSessionIdForTest(); + + auto killToken = [this, &lsid] { + // Create the session so there is something to kill + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession operationContextSession(opCtx.get(), true); + + auto killToken = catalog()->killSession(lsid); + + // Make sure the owning operation context is interrupted + ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), AssertionException, ErrorCodes::Interrupted); + + // Make sure that the checkOutForKill call will wait for the owning operation context to + // check the session back in + auto future = stdx::async(stdx::launch::async, [lsid] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready(); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + + OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true); + }); + + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired); + + return killToken; + }(); + + // Make sure that regular session check-out will fail because the session is marked as killed + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get(), true), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } + + // Schedule a separate "regular operation" thread, which will block on checking-out the session, + // which we will use to confirm that session kill completion actually unblocks check-out + auto future = stdx::async(stdx::launch::async, [lsid] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready(); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + + OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + // Make sure that "for kill" session check-out succeeds + { + auto opCtx = makeOperationContext(); + auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); + ASSERT_EQ(opCtx.get(), scopedSession->currentOperation()); + } + + // Make sure that session check-out after kill succeeds again + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession unusedOperationContextSession(opCtx.get(), true); + } + + // Make sure the "regular operation" eventually is able to proceed and use the just killed + // session + future.get(); +} + +TEST_F(SessionCatalogTest, MarkSessionAsKilledThrowsWhenCalledTwice) { + const auto lsid = makeLogicalSessionIdForTest(); + + // Create the session so there is something to kill + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession unusedOperationContextSession(opCtx.get(), true); + } + + auto killToken = catalog()->killSession(lsid); + + // Second mark as killed attempt will throw since the session is already killed + ASSERT_THROWS_CODE(catalog()->killSession(lsid), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); + + // Make sure that regular session check-out will fail because the session is marked as killed + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get(), true), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } + + // Finish "killing" the session so the SessionCatalog destructor doesn't complain + { + auto opCtx = makeOperationContext(); + auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); + ASSERT_EQ(opCtx.get(), scopedSession->currentOperation()); + } +} + +TEST_F(SessionCatalogTest, MarkSessionsAsKilledWhenSessionDoesNotExist) { + const auto nonExistentLsid = makeLogicalSessionIdForTest(); + ASSERT_THROWS_CODE( + catalog()->killSession(nonExistentLsid), AssertionException, ErrorCodes::NoSuchSession); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) { + // Create three sessions + const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest()}; + + std::vector<stdx::future<void>> futures; + + for (const auto& lsid : lsids) { + futures.emplace_back(stdx::async(stdx::launch::async, [lsid] { + ON_BLOCK_EXIT([&] { Client::destroy(); }); + Client::initThreadIfNotAlready(); + + { + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + + OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true); + ASSERT_THROWS_CODE(sideOpCtx->sleepFor(Hours{6}), + AssertionException, + ErrorCodes::ExceededTimeLimit); + } + + { + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + + OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true); + } + })); + + ASSERT(stdx::future_status::ready != + futures.back().wait_for(Milliseconds(10).toSystemDuration())); + } + + // Kill the first and the third sessions + { + std::vector<Session::KillToken> firstAndThirdTokens; + catalog()->scanSessions( + SessionKiller::Matcher( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}), + [&lsids, &firstAndThirdTokens](WithLock sessionCatalogLock, Session* session) { + if (session->getSessionId() == lsids[0] || session->getSessionId() == lsids[2]) + firstAndThirdTokens.emplace_back( + session->kill(sessionCatalogLock, ErrorCodes::ExceededTimeLimit)); + }); + ASSERT_EQ(2U, firstAndThirdTokens.size()); + for (auto& killToken : firstAndThirdTokens) { + auto unusedSheckedOutSessionForKill( + catalog()->checkOutSessionForKill(_opCtx, std::move(killToken))); + } + futures[0].get(); + futures[2].get(); + } + + // Kill the second session + { + std::vector<Session::KillToken> secondToken; + catalog()->scanSessions( + SessionKiller::Matcher( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}), + [&lsids, &secondToken](WithLock sessionCatalogLock, Session* session) { + if (session->getSessionId() == lsids[1]) + secondToken.emplace_back( + session->kill(sessionCatalogLock, ErrorCodes::ExceededTimeLimit)); + }); + ASSERT_EQ(1U, secondToken.size()); + for (auto& killToken : secondToken) { + auto unusedSheckedOutSessionForKill( + catalog()->checkOutSessionForKill(_opCtx, std::move(killToken))); + } + futures[1].get(); + } +} + } // namespace } // namespace mongo diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 95cbf2dd09b..17d440103a2 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -1051,29 +1051,11 @@ void TransactionParticipant::abortArbitraryTransaction() { _abortTransactionOnSession(lock); } -void TransactionParticipant::abortArbitraryTransactionIfExpired() { +bool TransactionParticipant::expired() const { stdx::lock_guard<stdx::mutex> lock(_mutex); - if (!_txnState.isInProgress(lock) || !_transactionExpireDate || - _transactionExpireDate >= Date_t::now()) { - return; - } - - const auto* session = getTransactionParticipant.owner(this); - auto currentOperation = session->currentOperation(); - if (currentOperation) { - // If an operation is still running for this transaction when it expires, kill the currently - // running operation. - stdx::lock_guard<Client> clientLock(*currentOperation->getClient()); - getGlobalServiceContext()->killOperation(currentOperation, ErrorCodes::ExceededTimeLimit); - } - - // Log after killing the current operation because jstests may wait to see this log message to - // imply that the operation has been killed. - log() << "Aborting transaction with txnNumber " << _activeTxnNumber << " on session with lsid " - << session->getSessionId().getId() - << " because it has been running for longer than 'transactionLifetimeLimitSeconds'"; - _abortTransactionOnSession(lock); + return _txnState.isInProgress(lock) && _transactionExpireDate && + _transactionExpireDate < getGlobalServiceContext()->getPreciseClockSource()->now(); } void TransactionParticipant::abortActiveTransaction(OperationContext* opCtx) { diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index a9830833c17..921acbbe3a4 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -321,12 +321,9 @@ public: void abortArbitraryTransaction(); /** - * Same as abortArbitraryTransaction, except only executes if _transactionExpireDate indicates - * that the transaction has expired. - * - * Not called with session checked out. + * Returns whether the transaction has exceedet its expiration time. */ - void abortArbitraryTransactionIfExpired(); + bool expired() const; /* * Aborts the transaction inside the transaction, releasing transaction resources. diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 70c5fe92ce2..d8e68714408 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -1111,8 +1111,9 @@ TEST_F(TxnParticipantTest, TransactionTimeoutDoesNotAbortPreparedTransactions) { ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); - txnParticipant->abortArbitraryTransactionIfExpired(); - ASSERT_FALSE(txnParticipant->transactionIsAborted()); + ASSERT(!txnParticipant->expired()); + txnParticipant->abortArbitraryTransaction(); + ASSERT(!txnParticipant->transactionIsAborted()); ASSERT(_opObserver->transactionPrepared); } |