summaryrefslogtreecommitdiff
path: root/src/mongo/db/session_catalog.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-09 08:17:38 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-10-31 04:50:08 -0400
commit236c6c28a18210586673097ee436c5b613b6c46f (patch)
tree9c26586d5943845b8f3356cbbee41dc75533670d /src/mongo/db/session_catalog.cpp
parente701da7ff3ec84b2bb3b353fa748c22f7b2a5878 (diff)
downloadmongo-236c6c28a18210586673097ee436c5b613b6c46f.tar.gz
SERVER-37244 Make sessions killable outside of the Session/TransactionParticipant object
Diffstat (limited to 'src/mongo/db/session_catalog.cpp')
-rw-r--r--src/mongo/db/session_catalog.cpp98
1 files changed, 46 insertions, 52 deletions
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index abc5165e5a3..70049de30a4 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -34,12 +34,8 @@
#include "mongo/db/session_catalog.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/transaction_participant.h"
-#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -58,6 +54,7 @@ SessionCatalog::~SessionCatalog() {
for (const auto& entry : _sessions) {
auto& sri = entry.second;
invariant(!sri->session.currentOperation());
+ invariant(!sri->session.killed());
}
}
@@ -88,6 +85,31 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx)
stdx::unique_lock<stdx::mutex> ul(_mutex);
auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid);
+ // Wait until the session is no longer checked out and until the previously scheduled kill has
+ // completed
+ opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&sri]() {
+ return !sri->session.currentOperation() && !sri->session.killed();
+ });
+
+ sri->session._markCheckedOut(ul, opCtx);
+
+ return ScopedCheckedOutSession(
+ opCtx, ScopedSession(std::move(sri)), boost::none /* Not checked out for kill */);
+}
+
+ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* opCtx,
+ Session::KillToken killToken) {
+ // This method is not supposed to be called with an already checked-out session due to risk of
+ // deadlock
+ invariant(!operationSessionDecoration(opCtx));
+ invariant(!opCtx->getTxnNumber());
+
+ const auto lsid = killToken.lsidToKill;
+
+ stdx::unique_lock<stdx::mutex> ul(_mutex);
+ auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid);
+ invariant(sri->session.killed());
+
// Wait until the session is no longer checked out
opCtx->waitForConditionOrInterrupt(
sri->availableCondVar, ul, [&sri]() { return !sri->session.currentOperation(); });
@@ -95,7 +117,8 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx)
invariant(!sri->session.currentOperation());
sri->session._markCheckedOut(ul, opCtx);
- return ScopedCheckedOutSession(opCtx, ScopedSession(std::move(sri)));
+ return ScopedCheckedOutSession(
+ opCtx, ScopedSession(std::move(sri)), std::move(killToken) /* Checked out for kill */);
}
ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx,
@@ -112,49 +135,6 @@ ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx,
return ss;
}
-void SessionCatalog::invalidateSessions(OperationContext* opCtx,
- boost::optional<BSONObj> singleSessionDoc) {
- auto replCoord = repl::ReplicationCoordinator::get(opCtx);
- bool isReplSet = replCoord->getReplicationMode() == repl::ReplicationCoordinator::modeReplSet;
- if (isReplSet) {
- uassert(40528,
- str::stream() << "Direct writes against "
- << NamespaceString::kSessionTransactionsTableNamespace.ns()
- << " cannot be performed using a transaction or on a session.",
- !opCtx->getLogicalSessionId());
- }
-
- const auto invalidateSessionFn = [&](WithLock, decltype(_sessions)::iterator it) {
- auto& sri = it->second;
- auto const txnParticipant =
- TransactionParticipant::getFromNonCheckedOutSession(&sri->session);
- txnParticipant->invalidate();
-
- // We cannot remove checked-out sessions from the cache, because operations expect to find
- // them there to check back in
- if (!sri->session.currentOperation()) {
- _sessions.erase(it);
- }
- };
-
- stdx::lock_guard<stdx::mutex> lg(_mutex);
-
- if (singleSessionDoc) {
- const auto lsid = LogicalSessionId::parse(IDLParserErrorContext("lsid"),
- singleSessionDoc->getField("_id").Obj());
-
- auto it = _sessions.find(lsid);
- if (it != _sessions.end()) {
- invalidateSessionFn(lg, it);
- }
- } else {
- auto it = _sessions.begin();
- while (it != _sessions.end()) {
- invalidateSessionFn(lg, it++);
- }
- }
-}
-
void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
const ScanSessionsCallbackFn& workerFn) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
@@ -163,15 +143,22 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
for (auto& sessionEntry : _sessions) {
if (matcher.match(sessionEntry.first)) {
- workerFn(&sessionEntry.second->session);
+ workerFn(lg, &sessionEntry.second->session);
}
}
}
+Session::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ auto it = _sessions.find(lsid);
+ uassert(ErrorCodes::NoSuchSession, "Session not found", it != _sessions.end());
+
+ auto& sri = it->second;
+ return sri->session.kill(lg);
+}
+
std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreateSessionRuntimeInfo(
WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) {
- invariant(!opCtx->lockState()->inAWriteUnitOfWork());
-
auto it = _sessions.find(lsid);
if (it == _sessions.end()) {
it = _sessions.emplace(lsid, std::make_shared<SessionRuntimeInfo>(lsid)).first;
@@ -180,7 +167,8 @@ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreate
return it->second;
}
-void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) {
+void SessionCatalog::_releaseSession(const LogicalSessionId& lsid,
+ boost::optional<Session::KillToken> killToken) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
auto it = _sessions.find(lsid);
@@ -191,6 +179,11 @@ void SessionCatalog::_releaseSession(const LogicalSessionId& lsid) {
sri->session._markCheckedIn(lg);
sri->availableCondVar.notify_one();
+
+ if (killToken) {
+ invariant(sri->session.killed());
+ sri->session._markNotKilled(lg, std::move(*killToken));
+ }
}
OperationContextSession::OperationContextSession(OperationContext* opCtx, bool checkOutSession)
@@ -236,6 +229,7 @@ OperationContextSession::~OperationContextSession() {
// SessionCatalog mutex, and other code may take the Client lock while holding that mutex.
stdx::unique_lock<Client> lk(*_opCtx->getClient());
ScopedCheckedOutSession sessionToDelete(std::move(checkedOutSession.get()));
+
// This destroys the moved-from ScopedCheckedOutSession, and must be done within the client
// lock.
checkedOutSession = boost::none;