summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-09 08:17:38 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-31 04:50:08 -0400
commit236c6c28a18210586673097ee436c5b613b6c46f (patch)
tree9c26586d5943845b8f3356cbbee41dc75533670d /src/mongo
parente701da7ff3ec84b2bb3b353fa748c22f7b2a5878 (diff)
downloadmongo-236c6c28a18210586673097ee436c5b613b6c46f.tar.gz
SERVER-37244 Make sessions killable outside of the Session/TransactionParticipant object
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript28
-rw-r--r--src/mongo/db/kill_sessions_local.cpp94
-rw-r--r--src/mongo/db/kill_sessions_local.h3
-rw-r--r--src/mongo/db/op_observer_impl.cpp14
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp108
-rw-r--r--src/mongo/db/pipeline/process_interface_standalone.cpp2
-rw-r--r--src/mongo/db/repl/replication_state_transition_lock_guard.cpp3
-rw-r--r--src/mongo/db/repl/rs_rollback.cpp2
-rw-r--r--src/mongo/db/session.cpp30
-rw-r--r--src/mongo/db/session.h38
-rw-r--r--src/mongo/db/session_catalog.cpp98
-rw-r--r--src/mongo/db/session_catalog.h48
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp55
-rw-r--r--src/mongo/db/session_catalog_mongod.h13
-rw-r--r--src/mongo/db/session_catalog_test.cpp245
-rw-r--r--src/mongo/db/transaction_participant.cpp24
-rw-r--r--src/mongo/db/transaction_participant.h7
-rw-r--r--src/mongo/db/transaction_participant_test.cpp5
18 files changed, 646 insertions, 171 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 44f61615274..580b995e181 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -637,6 +637,21 @@ env.Library(
)
env.Library(
+ target='session_catalog',
+ source=[
+ 'session_catalog.cpp',
+ 'session.cpp',
+ ],
+ LIBDEPS=[
+ 'kill_sessions',
+ 'logical_session_id',
+ ],
+ LIBDEPS_PRIVATE=[
+ 'service_context',
+ ]
+)
+
+env.Library(
target='catalog_raii',
source=[
'catalog_raii.cpp',
@@ -644,8 +659,6 @@ env.Library(
'retryable_writes_stats.cpp',
'server_transactions_metrics.cpp',
'session_catalog_mongod.cpp',
- 'session_catalog.cpp',
- 'session.cpp',
'single_transaction_stats.cpp',
'transaction_coordinator_factory.cpp',
'transaction_history_iterator.cpp',
@@ -668,15 +681,14 @@ env.Library(
'curop_metrics',
'dbdirectclient',
'index/index_access_method',
- 'kill_sessions',
- 'logical_session_id',
'namespace_string',
'repl/oplog_entry',
'repl/oplog_shim',
's/sharding_api_d',
+ 'session_catalog',
'stats/fill_locker_info',
+ 'stats/top',
'views/views',
- "stats/top",
],
LIBDEPS_PRIVATE=[
'commands/server_status',
@@ -1855,8 +1867,6 @@ env.Library(
'service_context_devnull_test_fixture.cpp',
],
LIBDEPS=[
- # this library is required only because of SERVER-29908
- '$BUILD_DIR/mongo/db/s/sharding_runtime_d',
'service_context_d_test_fixture',
],
LIBDEPS_PRIVATE=[
@@ -1967,8 +1977,8 @@ env.CppUnitTest(
],
LIBDEPS=[
'auth/authmocks',
- 'catalog_raii',
- 'repl/mock_repl_coord_server_fixture',
+ 'service_context_d_test_fixture',
+ 'session_catalog',
],
)
diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp
index d416b328552..2ea7b2b6f1d 100644
--- a/src/mongo/db/kill_sessions_local.cpp
+++ b/src/mongo/db/kill_sessions_local.cpp
@@ -46,29 +46,54 @@
namespace mongo {
namespace {
+/**
+ * Shortcut method shared by the various forms of session kill below. Every session kill operation
+ * consists of the following stages:
+ * 1) Select the sessions to kill, based on their lsid or owning user account (achieved through the
+ * 'matcher') and further refining that list through the 'filterFn'.
+ * 2) If any of the selected sessions are currently checked out, interrupt the owning operation
+ * context with 'reason' as the code.
+ * 3) Finish killing the selected and interrupted sessions through the 'killSessionFn'.
+ */
void killSessionsAction(OperationContext* opCtx,
const SessionKiller::Matcher& matcher,
- const SessionCatalog::ScanSessionsCallbackFn& killSessionFn) {
+ const stdx::function<bool(Session*)>& filterFn,
+ const stdx::function<void(Session*)>& killSessionFn,
+ ErrorCodes::Error reason = ErrorCodes::Interrupted) {
const auto catalog = SessionCatalog::get(opCtx);
- catalog->scanSessions(matcher, [&](Session* session) {
+ std::vector<Session::KillToken> sessionKillTokens;
+ catalog->scanSessions(matcher, [&](WithLock sessionCatalogLock, Session* session) {
+ if (filterFn(session))
+ sessionKillTokens.emplace_back(session->kill(sessionCatalogLock, reason));
+ });
+
+ for (auto& sessionKillToken : sessionKillTokens) {
+ auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken));
+
// TODO (SERVER-33850): Rename KillAllSessionsByPattern and
// ScopedKillAllSessionsByPatternImpersonator to not refer to session kill
const KillAllSessionsByPattern* pattern = matcher.match(session->getSessionId());
invariant(pattern);
ScopedKillAllSessionsByPatternImpersonator impersonator(opCtx, *pattern);
- killSessionFn(session);
- });
+ killSessionFn(session.get());
+ }
}
} // namespace
void killSessionsLocalKillTransactions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) {
- killSessionsAction(opCtx, matcher, [](Session* session) {
- TransactionParticipant::getFromNonCheckedOutSession(session)->abortArbitraryTransaction();
- });
+ const SessionKiller::Matcher& matcher,
+ ErrorCodes::Error reason) {
+ killSessionsAction(opCtx,
+ matcher,
+ [](Session*) { return true; },
+ [](Session* session) {
+ TransactionParticipant::getFromNonCheckedOutSession(session)
+ ->abortArbitraryTransaction();
+ },
+ reason);
}
SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
@@ -86,23 +111,52 @@ SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
void killAllExpiredTransactions(OperationContext* opCtx) {
SessionKiller::Matcher matcherAllSessions(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
- killSessionsAction(opCtx, matcherAllSessions, [](Session* session) {
- try {
- TransactionParticipant::getFromNonCheckedOutSession(session)
- ->abortArbitraryTransactionIfExpired();
- } catch (const DBException& ex) {
- warning() << "May have failed to abort expired transaction on session "
- << session->getSessionId().getId() << " due to " << redact(ex.toStatus());
- }
- });
+ killSessionsAction(
+ opCtx,
+ matcherAllSessions,
+ [](Session* session) {
+ const auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session);
+
+ return txnParticipant->expired();
+ },
+ [](Session* session) {
+ const auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session);
+
+ LOG(0)
+ << "Aborting transaction with txnNumber " << txnParticipant->getActiveTxnNumber()
+ << " on session " << session->getSessionId().getId()
+ << " because it has been running for longer than 'transactionLifetimeLimitSeconds'";
+
+ // The try/catch block below is necessary because expired() in the filterFn above could
+ // return true for expired, but unprepared transaction, but by the time we get to
+ // actually kill it, the participant could theoretically become prepared (being under
+ // the SessionCatalog mutex doesn't prevent the concurrently running thread from doing
+ // preparing the participant).
+ //
+ // Then when the execution reaches the killSessionFn, it would find the transaction is
+ // prepared and not allowed to be killed, which would cause the exception below
+ try {
+ txnParticipant->abortArbitraryTransaction();
+ } catch (const DBException& ex) {
+ warning() << "May have failed to abort expired transaction on session "
+ << session->getSessionId().getId() << " due to " << redact(ex.toStatus());
+ }
+ },
+ ErrorCodes::ExceededTimeLimit);
}
void killSessionsLocalShutdownAllTransactions(OperationContext* opCtx) {
SessionKiller::Matcher matcherAllSessions(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
- killSessionsAction(opCtx, matcherAllSessions, [](Session* session) {
- TransactionParticipant::getFromNonCheckedOutSession(session)->shutdown();
- });
+ killSessionsAction(opCtx,
+ matcherAllSessions,
+ [](Session*) { return true; },
+ [](Session* session) {
+ TransactionParticipant::getFromNonCheckedOutSession(session)->shutdown();
+ },
+ ErrorCodes::InterruptedAtShutdown);
}
} // namespace mongo
diff --git a/src/mongo/db/kill_sessions_local.h b/src/mongo/db/kill_sessions_local.h
index 3d6f0699cba..60cb150904a 100644
--- a/src/mongo/db/kill_sessions_local.h
+++ b/src/mongo/db/kill_sessions_local.h
@@ -48,7 +48,8 @@ SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
* Kills all transactions on mongod for sessions matching 'matcher'.
*/
void killSessionsLocalKillTransactions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher);
+ const SessionKiller::Matcher& matcher,
+ ErrorCodes::Error reason = ErrorCodes::Interrupted);
/**
* Aborts any expired transactions.
diff --git a/src/mongo/db/op_observer_impl.cpp b/src/mongo/db/op_observer_impl.cpp
index a295be04d5a..1ccf72f16da 100644
--- a/src/mongo/db/op_observer_impl.cpp
+++ b/src/mongo/db/op_observer_impl.cpp
@@ -53,7 +53,7 @@
#include "mongo/db/repl/replication_coordinator.h"
#include "mongo/db/s/collection_sharding_state.h"
#include "mongo/db/server_options.h"
-#include "mongo/db/session_catalog.h"
+#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/views/durable_view_catalog.h"
#include "mongo/scripting/engine.h"
@@ -435,7 +435,7 @@ void OpObserverImpl::onInserts(OperationContext* opCtx,
}
} else if (nss == NamespaceString::kSessionTransactionsTableNamespace && !lastOpTime.isNull()) {
for (auto it = first; it != last; it++) {
- SessionCatalog::get(opCtx)->invalidateSessions(opCtx, it->doc);
+ MongoDSessionCatalog::invalidateSessions(opCtx, it->doc);
}
}
}
@@ -504,7 +504,7 @@ void OpObserverImpl::onUpdate(OperationContext* opCtx, const OplogUpdateEntryArg
FeatureCompatibilityVersion::onInsertOrUpdate(opCtx, args.updateArgs.updatedDoc);
} else if (args.nss == NamespaceString::kSessionTransactionsTableNamespace &&
!opTime.writeOpTime.isNull()) {
- SessionCatalog::get(opCtx)->invalidateSessions(opCtx, args.updateArgs.updatedDoc);
+ MongoDSessionCatalog::invalidateSessions(opCtx, args.updateArgs.updatedDoc);
}
}
@@ -570,7 +570,7 @@ void OpObserverImpl::onDelete(OperationContext* opCtx,
uasserted(40670, "removing FeatureCompatibilityVersion document is not allowed");
} else if (nss == NamespaceString::kSessionTransactionsTableNamespace &&
!opTime.writeOpTime.isNull()) {
- SessionCatalog::get(opCtx)->invalidateSessions(opCtx, documentKey);
+ MongoDSessionCatalog::invalidateSessions(opCtx, documentKey);
}
}
@@ -730,7 +730,7 @@ void OpObserverImpl::onDropDatabase(OperationContext* opCtx, const std::string&
50714, "dropping the admin database is not allowed.", dbName != NamespaceString::kAdminDb);
if (dbName == NamespaceString::kSessionTransactionsTableNamespace.db()) {
- SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
+ MongoDSessionCatalog::invalidateSessions(opCtx, boost::none);
}
NamespaceUUIDCache::get(opCtx).evictNamespacesInDatabase(dbName);
@@ -770,7 +770,7 @@ repl::OpTime OpObserverImpl::onDropCollection(OperationContext* opCtx,
if (collectionName.coll() == DurableViewCatalog::viewsCollectionName()) {
DurableViewCatalog::onExternalChange(opCtx, collectionName);
} else if (collectionName == NamespaceString::kSessionTransactionsTableNamespace) {
- SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
+ MongoDSessionCatalog::invalidateSessions(opCtx, boost::none);
}
AuthorizationManager::get(opCtx->getServiceContext())
@@ -1145,7 +1145,7 @@ void OpObserverImpl::onReplicationRollback(OperationContext* opCtx,
// If there were ops rolled back that were part of operations on a session, then invalidate
// the session cache.
if (rbInfo.rollbackSessionIds.size() > 0) {
- SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
+ MongoDSessionCatalog::invalidateSessions(opCtx, boost::none);
}
// Reset the key manager cache.
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index abbb9b94754..dd4cbc4b021 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -345,62 +345,106 @@ public:
}
};
-TEST_F(OpObserverSessionCatalogTest, OnRollbackInvalidatesSessionCatalogIfSessionOpsRolledBack) {
- OpObserverImpl opObserver;
- auto opCtx = cc().makeOperationContext();
+using OpObserverSessionCatalogRollbackTest = OpObserverSessionCatalogTest;
+
+TEST_F(OpObserverSessionCatalogRollbackTest,
+ OnRollbackInvalidatesSessionCatalogIfSessionOpsRolledBack) {
const NamespaceString nss("testDB", "testColl");
// Create a session.
auto sessionCatalog = SessionCatalog::get(getServiceContext());
auto sessionId = makeLogicalSessionIdForTest();
- auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId);
- const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
- txnParticipant->refreshFromStorageIfNeeded(opCtx.get());
- // Simulate a write occurring on that session.
const TxnNumber txnNum = 0;
const StmtId stmtId = 1000;
- simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId);
- // Check that the statement executed.
- ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
+ {
+ auto opCtx = cc().makeOperationContext();
+ opCtx->setLogicalSessionId(sessionId);
+
+ // Create a session and sync it from disk
+ auto session = sessionCatalog->checkOutSession(opCtx.get());
+ const auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ txnParticipant->refreshFromStorageIfNeeded(opCtx.get());
+
+ // Simulate a write occurring on that session
+ simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId);
+
+ // Check that the statement executed
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
+ }
// The OpObserver should invalidate in-memory session state, so the check after this should
// fail.
- OpObserver::RollbackObserverInfo rbInfo;
- rbInfo.rollbackSessionIds = {UUID::gen()};
- opObserver.onReplicationRollback(opCtx.get(), rbInfo);
- ASSERT_THROWS_CODE(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId),
- DBException,
- ErrorCodes::ConflictingOperationInProgress);
+ {
+ auto opCtx = cc().makeOperationContext();
+
+ OpObserverImpl opObserver;
+ OpObserver::RollbackObserverInfo rbInfo;
+ rbInfo.rollbackSessionIds = {UUID::gen()};
+ opObserver.onReplicationRollback(opCtx.get(), rbInfo);
+ }
+
+ {
+ auto opCtx = cc().makeOperationContext();
+ opCtx->setLogicalSessionId(sessionId);
+
+ auto session = sessionCatalog->checkOutSession(opCtx.get());
+ const auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ ASSERT_THROWS_CODE(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId),
+ DBException,
+ ErrorCodes::ConflictingOperationInProgress);
+ }
}
-TEST_F(OpObserverSessionCatalogTest,
+TEST_F(OpObserverSessionCatalogRollbackTest,
OnRollbackDoesntInvalidateSessionCatalogIfNoSessionOpsRolledBack) {
- OpObserverImpl opObserver;
- auto opCtx = cc().makeOperationContext();
const NamespaceString nss("testDB", "testColl");
- // Create a session.
auto sessionCatalog = SessionCatalog::get(getServiceContext());
auto sessionId = makeLogicalSessionIdForTest();
- auto session = sessionCatalog->getOrCreateSession(opCtx.get(), sessionId);
- const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get());
- txnParticipant->refreshFromStorageIfNeeded(opCtx.get());
- // Simulate a write occurring on that session.
const TxnNumber txnNum = 0;
const StmtId stmtId = 1000;
- simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId);
- // Check that the statement executed.
- ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
+ {
+ auto opCtx = cc().makeOperationContext();
+ opCtx->setLogicalSessionId(sessionId);
+
+ // Create a session and sync it from disk
+ auto session = sessionCatalog->checkOutSession(opCtx.get());
+ const auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ txnParticipant->refreshFromStorageIfNeeded(opCtx.get());
- // The OpObserver should not invalidate the in-memory session state, so the check after this
- // should still succeed.
- OpObserver::RollbackObserverInfo rbInfo;
- opObserver.onReplicationRollback(opCtx.get(), rbInfo);
- ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
+ // Simulate a write occurring on that session
+ simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId);
+
+ // Check that the statement executed
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
+ }
+
+ // Because there are no sessions to rollback, the OpObserver should not invalidate the in-memory
+ // session state, so the check after this should still succeed.
+ {
+ auto opCtx = cc().makeOperationContext();
+
+ OpObserverImpl opObserver;
+ OpObserver::RollbackObserverInfo rbInfo;
+ opObserver.onReplicationRollback(opCtx.get(), rbInfo);
+ }
+
+ {
+ auto opCtx = cc().makeOperationContext();
+ opCtx->setLogicalSessionId(sessionId);
+
+ auto session = sessionCatalog->checkOutSession(opCtx.get());
+ const auto txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, stmtId));
+ }
}
/**
diff --git a/src/mongo/db/pipeline/process_interface_standalone.cpp b/src/mongo/db/pipeline/process_interface_standalone.cpp
index 75fb80d5183..660ee570b43 100644
--- a/src/mongo/db/pipeline/process_interface_standalone.cpp
+++ b/src/mongo/db/pipeline/process_interface_standalone.cpp
@@ -495,7 +495,7 @@ void MongoInterfaceStandalone::_reportCurrentOpsForIdleSessions(OperationContext
sessionCatalog->scanSessions(
{std::move(sessionFilter)},
- [&](Session* session) {
+ [&](WithLock sessionCatalogLock, Session* session) {
auto op =
TransactionParticipant::getFromNonCheckedOutSession(session)->reportStashedState();
if (!op.isEmpty()) {
diff --git a/src/mongo/db/repl/replication_state_transition_lock_guard.cpp b/src/mongo/db/repl/replication_state_transition_lock_guard.cpp
index b78fb7f290e..5e557b81857 100644
--- a/src/mongo/db/repl/replication_state_transition_lock_guard.cpp
+++ b/src/mongo/db/repl/replication_state_transition_lock_guard.cpp
@@ -55,7 +55,8 @@ ReplicationStateTransitionLockGuard::ReplicationStateTransitionLockGuard(Operati
// Destroy all stashed transaction resources, in order to release locks.
SessionKiller::Matcher matcherAllSessions(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
- killSessionsLocalKillTransactions(opCtx, matcherAllSessions);
+ killSessionsLocalKillTransactions(
+ opCtx, matcherAllSessions, ErrorCodes::InterruptedDueToStepDown);
}
_globalLock->waitForLockUntil(args.lockDeadline);
diff --git a/src/mongo/db/repl/rs_rollback.cpp b/src/mongo/db/repl/rs_rollback.cpp
index 33963224c53..9c76bbf5b18 100644
--- a/src/mongo/db/repl/rs_rollback.cpp
+++ b/src/mongo/db/repl/rs_rollback.cpp
@@ -1435,7 +1435,7 @@ void rollback_internal::syncFixUp(OperationContext* opCtx,
// If necessary, clear the memory of existing sessions.
if (fixUpInfo.refetchTransactionDocs) {
- SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
+ MongoDSessionCatalog::invalidateSessions(opCtx, boost::none);
}
if (auto validator = LogicalTimeValidator::get(opCtx)) {
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index a190f7daf43..3936dac0478 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -32,6 +32,8 @@
#include "mongo/db/session.h"
+#include "mongo/db/operation_context.h"
+
namespace mongo {
Session::Session(LogicalSessionId sessionId) : _sessionId(std::move(sessionId)) {}
@@ -41,6 +43,28 @@ OperationContext* Session::currentOperation() const {
return _checkoutOpCtx;
}
+Session::KillToken Session::kill(WithLock sessionCatalogLock, ErrorCodes::Error reason) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ uassert(ErrorCodes::ConflictingOperationInProgress, "Session already killed", !_killRequested);
+ _killRequested = true;
+
+ // For currently checked-out sessions, interrupt the operation context so that the current owner
+ // can release the session
+ if (_checkoutOpCtx) {
+ const auto serviceContext = _checkoutOpCtx->getServiceContext();
+
+ stdx::lock_guard<Client> clientLock(*_checkoutOpCtx->getClient());
+ serviceContext->killOperation(_checkoutOpCtx, reason);
+ }
+
+ return KillToken(getSessionId());
+}
+
+bool Session::killed() const {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ return _killRequested;
+}
+
void Session::_markCheckedOut(WithLock sessionCatalogLock, OperationContext* checkoutOpCtx) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
invariant(!_checkoutOpCtx);
@@ -53,4 +77,10 @@ void Session::_markCheckedIn(WithLock sessionCatalogLock) {
_checkoutOpCtx = nullptr;
}
+void Session::_markNotKilled(WithLock sessionCatalogLock, KillToken killToken) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ invariant(_killRequested);
+ _killRequested = false;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index e1e95741c11..f7349e0d29e 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -64,6 +64,35 @@ public:
*/
OperationContext* currentOperation() const;
+ /**
+ * Marks the session as killed and returns a 'kill token' to to be passed later on to
+ * 'checkOutSessionForKill' method of the SessionCatalog in order to permit the caller to
+ * execute any kill cleanup tasks and pass further on to '_markNotKilled' in order to reset the
+ * kill state. Marking session as killed is an internal property only that will cause any
+ * further calls to 'checkOutSession' to block until 'checkOutSessionForKill' is called and the
+ * returned scoped object destroyed.
+ *
+ * If the session is currently checked-out, this method will also interrupt the operation
+ * context which has it checked-out.
+ *
+ * If the session is already killed throws ConflictingOperationInProgress exception.
+ *
+ * Must be called under the owning SessionCatalog's lock.
+ */
+ struct KillToken {
+ KillToken(LogicalSessionId lsid) : lsidToKill(std::move(lsid)) {}
+ KillToken(KillToken&&) = default;
+ KillToken& operator=(KillToken&&) = default;
+
+ LogicalSessionId lsidToKill;
+ };
+ KillToken kill(WithLock sessionCatalogLock, ErrorCodes::Error reason = ErrorCodes::Interrupted);
+
+ /**
+ * Returns whether 'kill' has been called on this session.
+ */
+ bool killed() const;
+
private:
/**
* Set/clear the current check-out state of the session by storing the operation which has this
@@ -74,6 +103,12 @@ private:
void _markCheckedOut(WithLock sessionCatalogLock, OperationContext* checkoutOpCtx);
void _markCheckedIn(WithLock sessionCatalogLock);
+ /**
+ * Used by the session catalog when checking a session back in after a call to 'kill'. See the
+ * comments for 'kill for more details.
+ */
+ void _markNotKilled(WithLock sessionCatalogLock, KillToken killToken);
+
// The id of the session with which this object is associated
const LogicalSessionId _sessionId;
@@ -87,6 +122,9 @@ private:
// A pointer back to the currently running operation on this Session, or nullptr if there
// is no operation currently running for the Session.
OperationContext* _checkoutOpCtx{nullptr};
+
+ // Set to true if markKilled has been invoked for this session.
+ bool _killRequested{false};
};
} // namespace mongo
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index abc5165e5a3..70049de30a4 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -34,12 +34,8 @@
#include "mongo/db/session_catalog.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/transaction_participant.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -58,6 +54,7 @@ SessionCatalog::~SessionCatalog() {
for (const auto& entry : _sessions) {
auto& sri = entry.second;
invariant(!sri->session.currentOperation());
+ invariant(!sri->session.killed());
}
}
@@ -88,6 +85,31 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx)
stdx::unique_lock<stdx::mutex> ul(_mutex);
auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid);
+ // Wait until the session is no longer checked out and until the previously scheduled kill has
+ // completed
+ opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&sri]() {
+ return !sri->session.currentOperation() && !sri->session.killed();
+ });
+
+ sri->session._markCheckedOut(ul, opCtx);
+
+ return ScopedCheckedOutSession(
+ opCtx, ScopedSession(std::move(sri)), boost::none /* Not checked out for kill */);
+}
+
+ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* opCtx,
+ Session::KillToken killToken) {
+ // This method is not supposed to be called with an already checked-out session due to risk of
+ // deadlock
+ invariant(!operationSessionDecoration(opCtx));
+ invariant(!opCtx->getTxnNumber());
+
+ const auto lsid = killToken.lsidToKill;
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid);
+ invariant(sri->session.killed());
+
// Wait until the session is no longer checked out
opCtx->waitForConditionOrInterrupt(
sri->availableCondVar, ul, [&sri]() { return !sri->session.currentOperation(); });
@@ -95,7 +117,8 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx)
invariant(!sri->session.currentOperation());
sri->session._markCheckedOut(ul, opCtx);
- return ScopedCheckedOutSession(opCtx, ScopedSession(std::move(sri)));
+ return ScopedCheckedOutSession(
+ opCtx, ScopedSession(std::move(sri)), std::move(killToken) /* Checked out for kill */);
}
ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx,
@@ -112,49 +135,6 @@ ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx,
return ss;
}
-void SessionCatalog::invalidateSessions(OperationContext* opCtx,
- boost::optional<BSONObj> singleSessionDoc) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
- if (isReplSet) {
- uassert(40528,
- str::stream() << "Direct writes against "
- << NamespaceString::kSessionTransactionsTableNamespace.ns()
- << " cannot be performed using a transaction or on a session.",
- !opCtx->getLogicalSessionId());
- }
-
- const auto invalidateSessionFn = [&](WithLock, decltype(_sessions)::iterator it) {
- auto& sri = it->second;
- auto const txnParticipant =
- TransactionParticipant::getFromNonCheckedOutSession(&sri->session);
- txnParticipant->invalidate();
-
- // We cannot remove checked-out sessions from the cache, because operations expect to find
- // them there to check back in
- if (!sri->session.currentOperation()) {
- _sessions.erase(it);
- }
- };
-
- stdx::lock_guard<stdx::mutex> lg(_mutex);
-
- if (singleSessionDoc) {
- const auto lsid = LogicalSessionId::parse(IDLParserErrorContext("lsid"),
- singleSessionDoc->getField("_id").Obj());
-
- auto it = _sessions.find(lsid);
- if (it != _sessions.end()) {
- invalidateSessionFn(lg, it);
- }
- } else {
- auto it = _sessions.begin();
- while (it != _sessions.end()) {
- invalidateSessionFn(lg, it++);
- }
- }
-}
-
void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
const ScanSessionsCallbackFn& workerFn) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
@@ -163,15 +143,22 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
for (auto& sessionEntry : _sessions) {
if (matcher.match(sessionEntry.first)) {
- workerFn(&sessionEntry.second->session);
+ workerFn(lg, &sessionEntry.second->session);
}
}
}
+Session::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ auto it = _sessions.find(lsid);
+ uassert(ErrorCodes::NoSuchSession, "Session not found", it != _sessions.end());
+
+ auto& sri = it->second;
+ return sri->session.kill(lg);
+}
+
std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreateSessionRuntimeInfo(
WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) {
- invariant(!opCtx->lockState()->inAWriteUnitOfWork());
-
auto it = _sessions.find(lsid);
if (it == _sessions.end()) {
it = _sessions.emplace(lsid, std::make_shared<SessionRuntimeInfo>(lsid)).first;
@@ -180,7 +167,8 @@ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreate
return it->second;
}
-void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) {
+void SessionCatalog::_releaseSession(const LogicalSessionId& lsid,
+ boost::optional<Session::KillToken> killToken) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
auto it = _sessions.find(lsid);
@@ -191,6 +179,11 @@ void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) {
sri->session._markCheckedIn(lg);
sri->availableCondVar.notify_one();
+
+ if (killToken) {
+ invariant(sri->session.killed());
+ sri->session._markNotKilled(lg, std::move(*killToken));
+ }
}
OperationContextSession::OperationContextSession(OperationContext* opCtx, bool checkOutSession)
@@ -236,6 +229,7 @@ OperationContextSession::~OperationContextSession() {
// SessionCatalog mutex, and other code may take the Client lock while holding that mutex.
stdx::unique_lock<Client> lk(*_opCtx->getClient());
ScopedCheckedOutSession sessionToDelete(std::move(checkedOutSession.get()));
+
// This destroys the moved-from ScopedCheckedOutSession, and must be done within the client
// lock.
checkedOutSession = boost::none;
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index b07cf42221d..90f44b3b60c 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -31,6 +31,7 @@
#pragma once
#include <boost/optional.hpp>
+#include <vector>
#include "mongo/base/disallow_copying.h"
#include "mongo/db/logical_session_id.h"
@@ -87,6 +88,13 @@ public:
ScopedCheckedOutSession checkOutSession(OperationContext* opCtx);
/**
+ * See the description of 'Session::kill' for more information on the session kill usage
+ * pattern.
+ */
+ ScopedCheckedOutSession checkOutSessionForKill(OperationContext* opCtx,
+ Session::KillToken killToken);
+
+ /**
* Returns a reference to the specified cached session regardless of whether it is checked-out
* or not. The returned session is not returned checked-out and is allowed to be checked-out
* concurrently.
@@ -98,30 +106,26 @@ public:
ScopedSession getOrCreateSession(OperationContext* opCtx, const LogicalSessionId& lsid);
/**
- * Callback to be invoked when it is suspected that the on-disk session contents might not be in
- * sync with what is in the sessions cache.
- *
- * If no specific document is available, the method will invalidate all sessions. Otherwise if
- * one is avaiable (which is the case for insert/update/delete), it must contain _id field with
- * a valid session entry, in which case only that particular session will be invalidated. If the
- * _id field is missing or doesn't contain a valid serialization of logical session, the method
- * will throw. This prevents invalid entries from making it in the collection.
- */
- void invalidateSessions(OperationContext* opCtx, boost::optional<BSONObj> singleSessionDoc);
-
- /**
* Iterates through the SessionCatalog under the SessionCatalog mutex and applies 'workerFn' to
* each Session which matches the specified 'matcher'.
*
* NOTE: Since this method runs with the session catalog mutex, the work done by 'workerFn' is
* not allowed to block, perform I/O or acquire any lock manager locks.
+ * Iterates through the SessionCatalog and applies 'workerFn' to each Session. This locks the
+ * SessionCatalog.
*
* TODO SERVER-33850: Take Matcher out of the SessionKiller namespace.
*/
- using ScanSessionsCallbackFn = stdx::function<void(Session*)>;
+ using ScanSessionsCallbackFn = stdx::function<void(WithLock, Session*)>;
void scanSessions(const SessionKiller::Matcher& matcher,
const ScanSessionsCallbackFn& workerFn);
+ /**
+ * Shortcut to invoke 'kill' on the specified session under the SessionCatalog mutex. Throws a
+ * NoSuchSession exception if the session doesn't exist.
+ */
+ Session::KillToken killSession(const LogicalSessionId& lsid);
+
private:
struct SessionRuntimeInfo {
SessionRuntimeInfo(LogicalSessionId lsid) : session(std::move(lsid)) {}
@@ -146,7 +150,8 @@ private:
/**
* Makes a session, previously checked out through 'checkoutSession', available again.
*/
- void _releaseSession(const LogicalSessionId& lsid);
+ void _releaseSession(const LogicalSessionId& lsid,
+ boost::optional<Session::KillToken> killToken);
stdx::mutex _mutex;
@@ -192,13 +197,16 @@ class ScopedCheckedOutSession {
MONGO_DISALLOW_COPYING(ScopedCheckedOutSession);
friend ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext*);
+ friend ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext*,
+ Session::KillToken);
public:
ScopedCheckedOutSession(ScopedCheckedOutSession&&) = default;
~ScopedCheckedOutSession() {
if (_scopedSession) {
- SessionCatalog::get(_opCtx)->_releaseSession(_scopedSession->getSessionId());
+ SessionCatalog::get(_opCtx)->_releaseSession(_scopedSession->getSessionId(),
+ std::move(_killToken));
}
}
@@ -219,11 +227,17 @@ public:
}
private:
- ScopedCheckedOutSession(OperationContext* opCtx, ScopedSession scopedSession)
- : _opCtx(opCtx), _scopedSession(std::move(scopedSession)) {}
+ ScopedCheckedOutSession(OperationContext* opCtx,
+ ScopedSession scopedSession,
+ boost::optional<Session::KillToken> killToken)
+ : _opCtx(opCtx),
+ _killToken(std::move(killToken)),
+ _scopedSession(std::move(scopedSession)) {}
OperationContext* const _opCtx;
+ boost::optional<Session::KillToken> _killToken;
+
ScopedSession _scopedSession;
};
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index 5ae981e97b4..c17c75aa6a3 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -38,12 +38,15 @@
#include "mongo/db/client.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/namespace_string.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/service_context.h"
+#include "mongo/db/transaction_participant.h"
#include "mongo/rpc/get_status_from_command_result.h"
namespace mongo {
void MongoDSessionCatalog::onStepUp(OperationContext* opCtx) {
- SessionCatalog::get(opCtx)->invalidateSessions(opCtx, boost::none);
+ invalidateSessions(opCtx, boost::none);
const size_t initialExtentSize = 0;
const bool capped = false;
@@ -85,4 +88,54 @@ boost::optional<UUID> MongoDSessionCatalog::getTransactionTableUUID(OperationCon
return coll->uuid();
}
+void MongoDSessionCatalog::invalidateSessions(OperationContext* opCtx,
+ boost::optional<BSONObj> singleSessionDoc) {
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
+ if (isReplSet) {
+ uassert(40528,
+ str::stream() << "Direct writes against "
+ << NamespaceString::kSessionTransactionsTableNamespace.ns()
+ << " cannot be performed using a transaction or on a session.",
+ !opCtx->getLogicalSessionId());
+ }
+
+ const auto catalog = SessionCatalog::get(opCtx);
+
+ std::vector<Session::KillToken> sessionKillTokens;
+
+ if (singleSessionDoc) {
+ sessionKillTokens.emplace_back(catalog->killSession(LogicalSessionId::parse(
+ IDLParserErrorContext("lsid"), singleSessionDoc->getField("_id").Obj())));
+ } else {
+ SessionKiller::Matcher matcher(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)});
+ catalog->scanSessions(matcher,
+ [&sessionKillTokens](WithLock sessionCatalogLock, Session* session) {
+ sessionKillTokens.emplace_back(session->kill(sessionCatalogLock));
+ });
+ }
+
+ if (sessionKillTokens.empty())
+ return;
+
+ stdx::thread([
+ service = opCtx->getServiceContext(),
+ sessionKillTokens = std::move(sessionKillTokens)
+ ]() mutable {
+ auto uniqueClient = service->makeClient("Session catalog kill");
+ auto uniqueOpCtx = uniqueClient->makeOperationContext();
+ const auto opCtx = uniqueOpCtx.get();
+ const auto catalog = SessionCatalog::get(opCtx);
+
+ for (auto& sessionKillToken : sessionKillTokens) {
+ auto session = catalog->checkOutSessionForKill(opCtx, std::move(sessionKillToken));
+
+ auto const txnParticipant =
+ TransactionParticipant::getFromNonCheckedOutSession(session.get());
+ txnParticipant->invalidate();
+ }
+ }).detach();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h
index fc89f3ef0c9..4496c8b17f3 100644
--- a/src/mongo/db/session_catalog_mongod.h
+++ b/src/mongo/db/session_catalog_mongod.h
@@ -50,6 +50,19 @@ public:
* Required for rollback via refetch.
*/
static boost::optional<UUID> getTransactionTableUUID(OperationContext* opCtx);
+
+ /**
+ * Callback to be invoked when it is suspected that the on-disk session contents might not be in
+ * sync with what is in the sessions cache.
+ *
+ * If no specific document is available, the method will invalidate all sessions. Otherwise if
+ * one is avaiable (which is the case for insert/update/delete), it must contain _id field with
+ * a valid session entry, in which case only that particular session will be invalidated. If the
+ * _id field is missing or doesn't contain a valid serialization of logical session, the method
+ * will throw. This prevents invalid entries from making it in the collection.
+ */
+ static void invalidateSessions(OperationContext* opCtx,
+ boost::optional<BSONObj> singleSessionDoc);
};
} // namespace mongo
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index c4b78ce0180..09f28e5e6c8 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -174,7 +174,9 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) {
TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) {
std::vector<LogicalSessionId> lsids;
- auto workerFn = [&](Session* session) { lsids.push_back(session->getSessionId()); };
+ const auto workerFn = [&lsids](WithLock, Session* session) {
+ lsids.push_back(session->getSessionId());
+ };
// Scan over zero Sessions.
SessionKiller::Matcher matcherAllSessions(
@@ -206,5 +208,246 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) {
ASSERT_EQ(lsids.front(), lsid2);
}
+TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
+ const auto lsid = makeLogicalSessionIdForTest();
+
+ // Create the session so there is something to kill
+ {
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession unusedOperationContextSession(opCtx.get(), true);
+ }
+
+ auto killToken = catalog()->killSession(lsid);
+
+ // Make sure that regular session check-out will fail because the session is marked as killed
+ {
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired);
+ ASSERT_THROWS_CODE(OperationContextSession(opCtx.get(), true),
+ AssertionException,
+ ErrorCodes::MaxTimeMSExpired);
+ }
+
+ // Schedule a separate "regular operation" thread, which will block on checking-out the session,
+ // which we will use to confirm that session kill completion actually unblocks check-out
+ auto future = stdx::async(stdx::launch::async, [lsid] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready();
+ auto sideOpCtx = Client::getCurrent()->makeOperationContext();
+ sideOpCtx->setLogicalSessionId(lsid);
+
+ OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true);
+ });
+ ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration()));
+
+ // Make sure that "for kill" session check-out succeeds
+ {
+ auto opCtx = makeOperationContext();
+ auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken));
+ ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ }
+
+ // Make sure that session check-out after kill succeeds again
+ {
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession unusedOperationContextSession(opCtx.get(), true);
+ }
+
+ // Make sure the "regular operation" eventually is able to proceed and use the just killed
+ // session
+ future.get();
+}
+
+TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
+ const auto lsid = makeLogicalSessionIdForTest();
+
+ auto killToken = [this, &lsid] {
+ // Create the session so there is something to kill
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession operationContextSession(opCtx.get(), true);
+
+ auto killToken = catalog()->killSession(lsid);
+
+ // Make sure the owning operation context is interrupted
+ ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), AssertionException, ErrorCodes::Interrupted);
+
+ // Make sure that the checkOutForKill call will wait for the owning operation context to
+ // check the session back in
+ auto future = stdx::async(stdx::launch::async, [lsid] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready();
+ auto sideOpCtx = Client::getCurrent()->makeOperationContext();
+ sideOpCtx->setLogicalSessionId(lsid);
+ sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired);
+
+ OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true);
+ });
+
+ ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired);
+
+ return killToken;
+ }();
+
+ // Make sure that regular session check-out will fail because the session is marked as killed
+ {
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired);
+ ASSERT_THROWS_CODE(OperationContextSession(opCtx.get(), true),
+ AssertionException,
+ ErrorCodes::MaxTimeMSExpired);
+ }
+
+ // Schedule a separate "regular operation" thread, which will block on checking-out the session,
+ // which we will use to confirm that session kill completion actually unblocks check-out
+ auto future = stdx::async(stdx::launch::async, [lsid] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready();
+ auto sideOpCtx = Client::getCurrent()->makeOperationContext();
+ sideOpCtx->setLogicalSessionId(lsid);
+
+ OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true);
+ });
+ ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration()));
+
+ // Make sure that "for kill" session check-out succeeds
+ {
+ auto opCtx = makeOperationContext();
+ auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken));
+ ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ }
+
+ // Make sure that session check-out after kill succeeds again
+ {
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession unusedOperationContextSession(opCtx.get(), true);
+ }
+
+ // Make sure the "regular operation" eventually is able to proceed and use the just killed
+ // session
+ future.get();
+}
+
+TEST_F(SessionCatalogTest, MarkSessionAsKilledThrowsWhenCalledTwice) {
+ const auto lsid = makeLogicalSessionIdForTest();
+
+ // Create the session so there is something to kill
+ {
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession unusedOperationContextSession(opCtx.get(), true);
+ }
+
+ auto killToken = catalog()->killSession(lsid);
+
+ // Second mark as killed attempt will throw since the session is already killed
+ ASSERT_THROWS_CODE(catalog()->killSession(lsid),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
+
+ // Make sure that regular session check-out will fail because the session is marked as killed
+ {
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired);
+ ASSERT_THROWS_CODE(OperationContextSession(opCtx.get(), true),
+ AssertionException,
+ ErrorCodes::MaxTimeMSExpired);
+ }
+
+ // Finish "killing" the session so the SessionCatalog destructor doesn't complain
+ {
+ auto opCtx = makeOperationContext();
+ auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken));
+ ASSERT_EQ(opCtx.get(), scopedSession->currentOperation());
+ }
+}
+
+TEST_F(SessionCatalogTest, MarkSessionsAsKilledWhenSessionDoesNotExist) {
+ const auto nonExistentLsid = makeLogicalSessionIdForTest();
+ ASSERT_THROWS_CODE(
+ catalog()->killSession(nonExistentLsid), AssertionException, ErrorCodes::NoSuchSession);
+}
+
+TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
+ // Create three sessions
+ const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest()};
+
+ std::vector<stdx::future<void>> futures;
+
+ for (const auto& lsid : lsids) {
+ futures.emplace_back(stdx::async(stdx::launch::async, [lsid] {
+ ON_BLOCK_EXIT([&] { Client::destroy(); });
+ Client::initThreadIfNotAlready();
+
+ {
+ auto sideOpCtx = Client::getCurrent()->makeOperationContext();
+ sideOpCtx->setLogicalSessionId(lsid);
+
+ OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true);
+ ASSERT_THROWS_CODE(sideOpCtx->sleepFor(Hours{6}),
+ AssertionException,
+ ErrorCodes::ExceededTimeLimit);
+ }
+
+ {
+ auto sideOpCtx = Client::getCurrent()->makeOperationContext();
+ sideOpCtx->setLogicalSessionId(lsid);
+
+ OperationContextSession unusedOperationContextSession(sideOpCtx.get(), true);
+ }
+ }));
+
+ ASSERT(stdx::future_status::ready !=
+ futures.back().wait_for(Milliseconds(10).toSystemDuration()));
+ }
+
+ // Kill the first and the third sessions
+ {
+ std::vector<Session::KillToken> firstAndThirdTokens;
+ catalog()->scanSessions(
+ SessionKiller::Matcher(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}),
+ [&lsids, &firstAndThirdTokens](WithLock sessionCatalogLock, Session* session) {
+ if (session->getSessionId() == lsids[0] || session->getSessionId() == lsids[2])
+ firstAndThirdTokens.emplace_back(
+ session->kill(sessionCatalogLock, ErrorCodes::ExceededTimeLimit));
+ });
+ ASSERT_EQ(2U, firstAndThirdTokens.size());
+ for (auto& killToken : firstAndThirdTokens) {
+ auto unusedSheckedOutSessionForKill(
+ catalog()->checkOutSessionForKill(_opCtx, std::move(killToken)));
+ }
+ futures[0].get();
+ futures[2].get();
+ }
+
+ // Kill the second session
+ {
+ std::vector<Session::KillToken> secondToken;
+ catalog()->scanSessions(
+ SessionKiller::Matcher(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}),
+ [&lsids, &secondToken](WithLock sessionCatalogLock, Session* session) {
+ if (session->getSessionId() == lsids[1])
+ secondToken.emplace_back(
+ session->kill(sessionCatalogLock, ErrorCodes::ExceededTimeLimit));
+ });
+ ASSERT_EQ(1U, secondToken.size());
+ for (auto& killToken : secondToken) {
+ auto unusedSheckedOutSessionForKill(
+ catalog()->checkOutSessionForKill(_opCtx, std::move(killToken)));
+ }
+ futures[1].get();
+ }
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 95cbf2dd09b..17d440103a2 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -1051,29 +1051,11 @@ void TransactionParticipant::abortArbitraryTransaction() {
_abortTransactionOnSession(lock);
}
-void TransactionParticipant::abortArbitraryTransactionIfExpired() {
+bool TransactionParticipant::expired() const {
stdx::lock_guard<stdx::mutex> lock(_mutex);
- if (!_txnState.isInProgress(lock) || !_transactionExpireDate ||
- _transactionExpireDate >= Date_t::now()) {
- return;
- }
-
- const auto* session = getTransactionParticipant.owner(this);
- auto currentOperation = session->currentOperation();
- if (currentOperation) {
- // If an operation is still running for this transaction when it expires, kill the currently
- // running operation.
- stdx::lock_guard<Client> clientLock(*currentOperation->getClient());
- getGlobalServiceContext()->killOperation(currentOperation, ErrorCodes::ExceededTimeLimit);
- }
-
- // Log after killing the current operation because jstests may wait to see this log message to
- // imply that the operation has been killed.
- log() << "Aborting transaction with txnNumber " << _activeTxnNumber << " on session with lsid "
- << session->getSessionId().getId()
- << " because it has been running for longer than 'transactionLifetimeLimitSeconds'";
- _abortTransactionOnSession(lock);
+ return _txnState.isInProgress(lock) && _transactionExpireDate &&
+ _transactionExpireDate < getGlobalServiceContext()->getPreciseClockSource()->now();
}
void TransactionParticipant::abortActiveTransaction(OperationContext* opCtx) {
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index a9830833c17..921acbbe3a4 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -321,12 +321,9 @@ public:
void abortArbitraryTransaction();
/**
- * Same as abortArbitraryTransaction, except only executes if _transactionExpireDate indicates
- * that the transaction has expired.
- *
- * Not called with session checked out.
+ * Returns whether the transaction has exceedet its expiration time.
*/
- void abortArbitraryTransactionIfExpired();
+ bool expired() const;
/*
* Aborts the transaction inside the transaction, releasing transaction resources.
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 70c5fe92ce2..d8e68714408 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -1111,8 +1111,9 @@ TEST_F(TxnParticipantTest, TransactionTimeoutDoesNotAbortPreparedTransactions) {
ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
- txnParticipant->abortArbitraryTransactionIfExpired();
- ASSERT_FALSE(txnParticipant->transactionIsAborted());
+ ASSERT(!txnParticipant->expired());
+ txnParticipant->abortArbitraryTransaction();
+ ASSERT(!txnParticipant->transactionIsAborted());
ASSERT(_opObserver->transactionPrepared);
}