diff options
Diffstat (limited to 'src')
-rw-r--r-- | src/mongo/db/commands/killcursors_cmd.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/cursor_manager.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/kill_sessions_local.cpp | 28 | ||||
-rw-r--r-- | src/mongo/db/kill_sessions_local.h | 8 | ||||
-rw-r--r-- | src/mongo/db/session.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/session.h | 16 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 18 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.h | 10 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_test.cpp | 36 |
9 files changed, 108 insertions, 33 deletions
diff --git a/src/mongo/db/commands/killcursors_cmd.cpp b/src/mongo/db/commands/killcursors_cmd.cpp index b883d2e2a05..453c703dfa4 100644 --- a/src/mongo/db/commands/killcursors_cmd.cpp +++ b/src/mongo/db/commands/killcursors_cmd.cpp @@ -89,7 +89,7 @@ private: if (txnToAbort) { auto session = SessionCatalog::get(opCtx)->getSession(opCtx, txnToAbort->first); if (session) { - (*session)->abortIfSnapshotRead(opCtx, txnToAbort->second); + (*session)->abortIfSnapshotRead(txnToAbort->second); } } diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp index 08653cfda22..7f2b9fbe199 100644 --- a/src/mongo/db/cursor_manager.cpp +++ b/src/mongo/db/cursor_manager.cpp @@ -259,7 +259,7 @@ bool GlobalCursorIdCache::killCursor(OperationContext* opCtx, CursorId id, bool if (txnToAbort) { auto session = SessionCatalog::get(opCtx)->getSession(opCtx, txnToAbort->first); if (session) { - (*session)->abortIfSnapshotRead(opCtx, txnToAbort->second); + (*session)->abortIfSnapshotRead(txnToAbort->second); } } @@ -318,7 +318,7 @@ std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, Date_t for (auto&& txnToAbort : txnsToAbort) { auto session = SessionCatalog::get(opCtx)->getSession(opCtx, txnToAbort.first); if (session) { - (*session)->abortIfSnapshotRead(opCtx, txnToAbort.second); + (*session)->abortIfSnapshotRead(txnToAbort.second); } } diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp index 2bff47e89cc..fd206cca8fa 100644 --- a/src/mongo/db/kill_sessions_local.cpp +++ b/src/mongo/db/kill_sessions_local.cpp @@ -37,28 +37,34 @@ #include "mongo/db/kill_sessions_common.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" +#include "mongo/db/session.h" +#include "mongo/db/session_catalog.h" #include "mongo/util/log.h" namespace mongo { - -SessionKiller::Result killSessionsLocalKillCursors(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) { +namespace { +void killSessionsLocalKillCursors(OperationContext* opCtx, const SessionKiller::Matcher& matcher) { auto res = CursorManager::killCursorsWithMatchingSessions(opCtx, matcher); - auto status = res.first; + uassertStatusOK(res.first); +} - if (status.isOK()) { - return std::vector<HostAndPort>{}; - } else { - return status; - } +void killSessionsLocalKillTransactions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher) { + SessionCatalog::get(opCtx)->scanSessions( + opCtx, matcher, [](OperationContext* opCtx, Session* session) { + session->abortTransaction(); + }); } +} // namespace SessionKiller::Result killSessionsLocal(OperationContext* opCtx, const SessionKiller::Matcher& matcher, SessionKiller::UniformRandomBitGenerator* urbg) { - uassertStatusOK(killSessionsLocalKillCursors(opCtx, matcher)); - return uassertStatusOK(killSessionsLocalKillOps(opCtx, matcher)); + killSessionsLocalKillCursors(opCtx, matcher); + uassertStatusOK(killSessionsLocalKillOps(opCtx, matcher)); + killSessionsLocalKillTransactions(opCtx, matcher); + return {std::vector<HostAndPort>{}}; } } // namespace mongo diff --git a/src/mongo/db/kill_sessions_local.h b/src/mongo/db/kill_sessions_local.h index 0785eb9d6a1..7177fad6b0d 100644 --- a/src/mongo/db/kill_sessions_local.h +++ b/src/mongo/db/kill_sessions_local.h @@ -28,17 +28,15 @@ #pragma once -#include "mongo/db/kill_sessions.h" - #include "mongo/db/session_killer.h" namespace mongo { +/** + * Kills all cursors, ops, and transactions on mongod for sessions matching 'matcher'. + */ SessionKiller::Result killSessionsLocal(OperationContext* opCtx, const SessionKiller::Matcher& matcher, SessionKiller::UniformRandomBitGenerator* urbg); -SessionKiller::Result killSessionsLocalKillCursors(OperationContext* opCtx, - const SessionKiller::Matcher& matcher); - } // namespace mongo diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp index 1d0fc2e0de4..36640b93cda 100644 --- a/src/mongo/db/session.cpp +++ b/src/mongo/db/session.cpp @@ -598,7 +598,7 @@ void Session::unstashTransactionResources(OperationContext* opCtx) { // to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort // the current transaction. Note that it would indicate a user bug to have a newer // transaction on one shard while an older transaction is still active on another shard. - _releaseStashedTransactionResources(lg, opCtx); + _releaseStashedTransactionResources(lg); uasserted(ErrorCodes::TransactionAborted, str::stream() << "Transaction aborted. Active txnNumber is now " << _activeTxnNumber); @@ -617,19 +617,24 @@ void Session::unstashTransactionResources(OperationContext* opCtx) { } } -void Session::abortIfSnapshotRead(OperationContext* opCtx, TxnNumber txnNumber) { +void Session::abortIfSnapshotRead(TxnNumber txnNumber) { stdx::lock_guard<stdx::mutex> lg(_mutex); if (_activeTxnNumber == txnNumber && _autocommit) { - _releaseStashedTransactionResources(lg, opCtx); + _releaseStashedTransactionResources(lg); + _txnState = MultiDocumentTransactionState::kAborted; } } -void Session::_releaseStashedTransactionResources(WithLock wl, OperationContext* opCtx) { - if (opCtx->getWriteUnitOfWork()) { - opCtx->setWriteUnitOfWork(nullptr); - } +void Session::abortTransaction() { + stdx::lock_guard<stdx::mutex> lg(_mutex); + _releaseStashedTransactionResources(lg); + _txnState = MultiDocumentTransactionState::kAborted; +} +void Session::_releaseStashedTransactionResources(WithLock wl) { _txnResourceStash = boost::none; + _transactionOperations.clear(); + _txnState = MultiDocumentTransactionState::kNone; } void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) { diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index 64d1aeefd88..bb866ff2946 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -208,14 +208,12 @@ public: bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const; /** - * Transfers management of both locks and WiredTiger transaction from the OperationContext to - * the Session. + * Transfers management of transaction resources from the OperationContext to the Session. */ void stashTransactionResources(OperationContext* opCtx); /** - * Transfers management of both locks and WiredTiger transaction from the Session to the - * OperationContext. + * Transfers management of transaction resources from the Session to the OperationContext. */ void unstashTransactionResources(OperationContext* opCtx); @@ -223,7 +221,12 @@ public: * If there is transaction in progress with transaction number 'txnNumber' and _autocommit=true, * aborts the transaction. */ - void abortIfSnapshotRead(OperationContext* opCtx, TxnNumber txnNumber); + void abortIfSnapshotRead(TxnNumber txnNumber); + + /** + * Aborts the transaction, releasing stashed transaction resources. + */ + void abortTransaction(); bool getAutocommit() const { return _autocommit; @@ -293,8 +296,7 @@ private: std::vector<StmtId> stmtIdsWritten, const repl::OpTime& lastStmtIdWriteTs); - // Releases stashed locks and WiredTiger transaction. This implicitly aborts the transaction. - void _releaseStashedTransactionResources(WithLock, OperationContext* opCtx); + void _releaseStashedTransactionResources(WithLock); const LogicalSessionId _sessionId; diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 5ae80ffa849..35943ed935c 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -36,6 +36,7 @@ #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" +#include "mongo/db/kill_sessions_common.h" #include "mongo/db/namespace_string.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" @@ -237,6 +238,23 @@ void SessionCatalog::invalidateSessions(OperationContext* opCtx, } } +void SessionCatalog::scanSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher, + stdx::function<void(OperationContext*, Session*)> workerFn) { + stdx::lock_guard<stdx::mutex> lg(_mutex); + + LOG(2) << "Beginning scanSessions. Scanning " << _txnTable.size() << " sessions."; + + for (auto it = _txnTable.begin(); it != _txnTable.end(); ++it) { + // TODO SERVER-33850: Rename KillAllSessionsByPattern and + // ScopedKillAllSessionsByPatternImpersonator to not refer to session kill. + if (const KillAllSessionsByPattern* pattern = matcher.match(it->first)) { + ScopedKillAllSessionsByPatternImpersonator impersonator(opCtx, *pattern); + workerFn(opCtx, &(it->second->txnState)); + } + } +} + std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreateSessionRuntimeInfo( WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) { invariant(!opCtx->lockState()->inAWriteUnitOfWork()); diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index 6dbb5b4fda4..ddcea4db9fb 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -31,6 +31,7 @@ #include "mongo/base/disallow_copying.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/session.h" +#include "mongo/db/session_killer.h" #include "mongo/stdx/condition_variable.h" #include "mongo/stdx/mutex.h" #include "mongo/stdx/unordered_map.h" @@ -135,6 +136,15 @@ public: */ void invalidateSessions(OperationContext* opCtx, boost::optional<BSONObj> singleSessionDoc); + /** + * Iterates through the SessionCatalog and applies 'workerFn' to each Session. This locks the + * SessionCatalog. + * TODO SERVER-33850: Take Matcher out of the SessionKiller namespace. + */ + void scanSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher, + stdx::function<void(OperationContext*, Session*)> workerFn); + private: struct SessionRuntimeInfo { SessionRuntimeInfo(LogicalSessionId lsid) : txnState(std::move(lsid)) {} diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index 1dd009479fd..a02203e5554 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -278,5 +278,41 @@ TEST_F(SessionCatalogTest, OnlyCheckOutSessionWithCheckOutSessionTrue) { } } +TEST_F(SessionCatalogTest, ScanSessions) { + std::vector<LogicalSessionId> lsids; + auto workerFn = [&](OperationContext* opCtx, Session* session) { + lsids.push_back(session->getSessionId()); + }; + + // Scan over zero Sessions. + SessionKiller::Matcher matcherAllSessions( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx())}); + catalog()->scanSessions(opCtx(), matcherAllSessions, workerFn); + ASSERT(lsids.empty()); + + // Create three sessions in the catalog. + auto lsid1 = makeLogicalSessionIdForTest(); + auto lsid2 = makeLogicalSessionIdForTest(); + auto lsid3 = makeLogicalSessionIdForTest(); + { + auto scopedSession1 = catalog()->getOrCreateSession(opCtx(), lsid1); + auto scopedSession2 = catalog()->getOrCreateSession(opCtx(), lsid2); + auto scopedSession3 = catalog()->getOrCreateSession(opCtx(), lsid3); + } + + // Scan over all Sessions. + lsids.clear(); + catalog()->scanSessions(opCtx(), matcherAllSessions, workerFn); + ASSERT_EQ(lsids.size(), 3U); + + // Scan over all Sessions, visiting a particular Session. + SessionKiller::Matcher matcherLSID2( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx(), lsid2)}); + lsids.clear(); + catalog()->scanSessions(opCtx(), matcherLSID2, workerFn); + ASSERT_EQ(lsids.size(), 1U); + ASSERT_EQ(lsids.front(), lsid2); +} + } // namespace } // namespace mongo |