summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/commands/killcursors_cmd.cpp2
-rw-r--r--src/mongo/db/cursor_manager.cpp4
-rw-r--r--src/mongo/db/kill_sessions_local.cpp28
-rw-r--r--src/mongo/db/kill_sessions_local.h8
-rw-r--r--src/mongo/db/session.cpp19
-rw-r--r--src/mongo/db/session.h16
-rw-r--r--src/mongo/db/session_catalog.cpp18
-rw-r--r--src/mongo/db/session_catalog.h10
-rw-r--r--src/mongo/db/session_catalog_test.cpp36
9 files changed, 108 insertions, 33 deletions
diff --git a/src/mongo/db/commands/killcursors_cmd.cpp b/src/mongo/db/commands/killcursors_cmd.cpp
index b883d2e2a05..453c703dfa4 100644
--- a/src/mongo/db/commands/killcursors_cmd.cpp
+++ b/src/mongo/db/commands/killcursors_cmd.cpp
@@ -89,7 +89,7 @@ private:
if (txnToAbort) {
auto session = SessionCatalog::get(opCtx)->getSession(opCtx, txnToAbort->first);
if (session) {
- (*session)->abortIfSnapshotRead(opCtx, txnToAbort->second);
+ (*session)->abortIfSnapshotRead(txnToAbort->second);
}
}
diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp
index 08653cfda22..7f2b9fbe199 100644
--- a/src/mongo/db/cursor_manager.cpp
+++ b/src/mongo/db/cursor_manager.cpp
@@ -259,7 +259,7 @@ bool GlobalCursorIdCache::killCursor(OperationContext* opCtx, CursorId id, bool
if (txnToAbort) {
auto session = SessionCatalog::get(opCtx)->getSession(opCtx, txnToAbort->first);
if (session) {
- (*session)->abortIfSnapshotRead(opCtx, txnToAbort->second);
+ (*session)->abortIfSnapshotRead(txnToAbort->second);
}
}
@@ -318,7 +318,7 @@ std::size_t GlobalCursorIdCache::timeoutCursors(OperationContext* opCtx, Date_t
for (auto&& txnToAbort : txnsToAbort) {
auto session = SessionCatalog::get(opCtx)->getSession(opCtx, txnToAbort.first);
if (session) {
- (*session)->abortIfSnapshotRead(opCtx, txnToAbort.second);
+ (*session)->abortIfSnapshotRead(txnToAbort.second);
}
}
diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp
index 2bff47e89cc..fd206cca8fa 100644
--- a/src/mongo/db/kill_sessions_local.cpp
+++ b/src/mongo/db/kill_sessions_local.cpp
@@ -37,28 +37,34 @@
#include "mongo/db/kill_sessions_common.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/util/log.h"
namespace mongo {
-
-SessionKiller::Result killSessionsLocalKillCursors(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) {
+namespace {
+void killSessionsLocalKillCursors(OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
auto res = CursorManager::killCursorsWithMatchingSessions(opCtx, matcher);
- auto status = res.first;
+ uassertStatusOK(res.first);
+}
- if (status.isOK()) {
- return std::vector<HostAndPort>{};
- } else {
- return status;
- }
+void killSessionsLocalKillTransactions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher) {
+ SessionCatalog::get(opCtx)->scanSessions(
+ opCtx, matcher, [](OperationContext* opCtx, Session* session) {
+ session->abortTransaction();
+ });
}
+} // namespace
SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
const SessionKiller::Matcher& matcher,
SessionKiller::UniformRandomBitGenerator* urbg) {
- uassertStatusOK(killSessionsLocalKillCursors(opCtx, matcher));
- return uassertStatusOK(killSessionsLocalKillOps(opCtx, matcher));
+ killSessionsLocalKillCursors(opCtx, matcher);
+ uassertStatusOK(killSessionsLocalKillOps(opCtx, matcher));
+ killSessionsLocalKillTransactions(opCtx, matcher);
+ return {std::vector<HostAndPort>{}};
}
} // namespace mongo
diff --git a/src/mongo/db/kill_sessions_local.h b/src/mongo/db/kill_sessions_local.h
index 0785eb9d6a1..7177fad6b0d 100644
--- a/src/mongo/db/kill_sessions_local.h
+++ b/src/mongo/db/kill_sessions_local.h
@@ -28,17 +28,15 @@
#pragma once
-#include "mongo/db/kill_sessions.h"
-
#include "mongo/db/session_killer.h"
namespace mongo {
+/**
+ * Kills all cursors, ops, and transactions on mongod for sessions matching 'matcher'.
+ */
SessionKiller::Result killSessionsLocal(OperationContext* opCtx,
const SessionKiller::Matcher& matcher,
SessionKiller::UniformRandomBitGenerator* urbg);
-SessionKiller::Result killSessionsLocalKillCursors(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher);
-
} // namespace mongo
diff --git a/src/mongo/db/session.cpp b/src/mongo/db/session.cpp
index 1d0fc2e0de4..36640b93cda 100644
--- a/src/mongo/db/session.cpp
+++ b/src/mongo/db/session.cpp
@@ -598,7 +598,7 @@ void Session::unstashTransactionResources(OperationContext* opCtx) {
// to a higher _activeTxnNumber during the lifetime of a checkout. If that occurs, we abort
// the current transaction. Note that it would indicate a user bug to have a newer
// transaction on one shard while an older transaction is still active on another shard.
- _releaseStashedTransactionResources(lg, opCtx);
+ _releaseStashedTransactionResources(lg);
uasserted(ErrorCodes::TransactionAborted,
str::stream() << "Transaction aborted. Active txnNumber is now "
<< _activeTxnNumber);
@@ -617,19 +617,24 @@ void Session::unstashTransactionResources(OperationContext* opCtx) {
}
}
-void Session::abortIfSnapshotRead(OperationContext* opCtx, TxnNumber txnNumber) {
+void Session::abortIfSnapshotRead(TxnNumber txnNumber) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
if (_activeTxnNumber == txnNumber && _autocommit) {
- _releaseStashedTransactionResources(lg, opCtx);
+ _releaseStashedTransactionResources(lg);
+ _txnState = MultiDocumentTransactionState::kAborted;
}
}
-void Session::_releaseStashedTransactionResources(WithLock wl, OperationContext* opCtx) {
- if (opCtx->getWriteUnitOfWork()) {
- opCtx->setWriteUnitOfWork(nullptr);
- }
+void Session::abortTransaction() {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ _releaseStashedTransactionResources(lg);
+ _txnState = MultiDocumentTransactionState::kAborted;
+}
+void Session::_releaseStashedTransactionResources(WithLock wl) {
_txnResourceStash = boost::none;
+ _transactionOperations.clear();
+ _txnState = MultiDocumentTransactionState::kNone;
}
void Session::_beginOrContinueTxnOnMigration(WithLock wl, TxnNumber txnNumber) {
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 64d1aeefd88..bb866ff2946 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -208,14 +208,12 @@ public:
bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const;
/**
- * Transfers management of both locks and WiredTiger transaction from the OperationContext to
- * the Session.
+ * Transfers management of transaction resources from the OperationContext to the Session.
*/
void stashTransactionResources(OperationContext* opCtx);
/**
- * Transfers management of both locks and WiredTiger transaction from the Session to the
- * OperationContext.
+ * Transfers management of transaction resources from the Session to the OperationContext.
*/
void unstashTransactionResources(OperationContext* opCtx);
@@ -223,7 +221,12 @@ public:
* If there is transaction in progress with transaction number 'txnNumber' and _autocommit=true,
* aborts the transaction.
*/
- void abortIfSnapshotRead(OperationContext* opCtx, TxnNumber txnNumber);
+ void abortIfSnapshotRead(TxnNumber txnNumber);
+
+ /**
+ * Aborts the transaction, releasing stashed transaction resources.
+ */
+ void abortTransaction();
bool getAutocommit() const {
return _autocommit;
@@ -293,8 +296,7 @@ private:
std::vector<StmtId> stmtIdsWritten,
const repl::OpTime& lastStmtIdWriteTs);
- // Releases stashed locks and WiredTiger transaction. This implicitly aborts the transaction.
- void _releaseStashedTransactionResources(WithLock, OperationContext* opCtx);
+ void _releaseStashedTransactionResources(WithLock);
const LogicalSessionId _sessionId;
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index 5ae80ffa849..35943ed935c 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -36,6 +36,7 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/kill_sessions_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
@@ -237,6 +238,23 @@ void SessionCatalog::invalidateSessions(OperationContext* opCtx,
}
}
+void SessionCatalog::scanSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher,
+ stdx::function<void(OperationContext*, Session*)> workerFn) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+
+ LOG(2) << "Beginning scanSessions. Scanning " << _txnTable.size() << " sessions.";
+
+ for (auto it = _txnTable.begin(); it != _txnTable.end(); ++it) {
+ // TODO SERVER-33850: Rename KillAllSessionsByPattern and
+ // ScopedKillAllSessionsByPatternImpersonator to not refer to session kill.
+ if (const KillAllSessionsByPattern* pattern = matcher.match(it->first)) {
+ ScopedKillAllSessionsByPatternImpersonator impersonator(opCtx, *pattern);
+ workerFn(opCtx, &(it->second->txnState));
+ }
+ }
+}
+
std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreateSessionRuntimeInfo(
WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) {
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 6dbb5b4fda4..ddcea4db9fb 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -31,6 +31,7 @@
#include "mongo/base/disallow_copying.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/session.h"
+#include "mongo/db/session_killer.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
@@ -135,6 +136,15 @@ public:
*/
void invalidateSessions(OperationContext* opCtx, boost::optional<BSONObj> singleSessionDoc);
+ /**
+ * Iterates through the SessionCatalog and applies 'workerFn' to each Session. This locks the
+ * SessionCatalog.
+ * TODO SERVER-33850: Take Matcher out of the SessionKiller namespace.
+ */
+ void scanSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher,
+ stdx::function<void(OperationContext*, Session*)> workerFn);
+
private:
struct SessionRuntimeInfo {
SessionRuntimeInfo(LogicalSessionId lsid) : txnState(std::move(lsid)) {}
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index 1dd009479fd..a02203e5554 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -278,5 +278,41 @@ TEST_F(SessionCatalogTest, OnlyCheckOutSessionWithCheckOutSessionTrue) {
}
}
+TEST_F(SessionCatalogTest, ScanSessions) {
+ std::vector<LogicalSessionId> lsids;
+ auto workerFn = [&](OperationContext* opCtx, Session* session) {
+ lsids.push_back(session->getSessionId());
+ };
+
+ // Scan over zero Sessions.
+ SessionKiller::Matcher matcherAllSessions(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx())});
+ catalog()->scanSessions(opCtx(), matcherAllSessions, workerFn);
+ ASSERT(lsids.empty());
+
+ // Create three sessions in the catalog.
+ auto lsid1 = makeLogicalSessionIdForTest();
+ auto lsid2 = makeLogicalSessionIdForTest();
+ auto lsid3 = makeLogicalSessionIdForTest();
+ {
+ auto scopedSession1 = catalog()->getOrCreateSession(opCtx(), lsid1);
+ auto scopedSession2 = catalog()->getOrCreateSession(opCtx(), lsid2);
+ auto scopedSession3 = catalog()->getOrCreateSession(opCtx(), lsid3);
+ }
+
+ // Scan over all Sessions.
+ lsids.clear();
+ catalog()->scanSessions(opCtx(), matcherAllSessions, workerFn);
+ ASSERT_EQ(lsids.size(), 3U);
+
+ // Scan over all Sessions, visiting a particular Session.
+ SessionKiller::Matcher matcherLSID2(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx(), lsid2)});
+ lsids.clear();
+ catalog()->scanSessions(opCtx(), matcherLSID2, workerFn);
+ ASSERT_EQ(lsids.size(), 1U);
+ ASSERT_EQ(lsids.front(), lsid2);
+}
+
} // namespace
} // namespace mongo