summaryrefslogtreecommitdiff
path: root/src/mongo/db/session_catalog.cpp
diff options
context:
space:
mode:
authorCheahuychou Mao <mao.cheahuychou@gmail.com>2022-04-13 20:14:37 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-04-13 20:53:37 +0000
commit87393ce9bcfe06f8aa93b856474fb77bfb3a5267 (patch)
tree2f8da9e5f696453e54e4479a653155bcafd72278 /src/mongo/db/session_catalog.cpp
parent8a6ccfc1aedfd0006fe4ec4fefd2e7f23abc5e8f (diff)
downloadmongo-87393ce9bcfe06f8aa93b856474fb77bfb3a5267.tar.gz
SERVER-62479 Reap sessions for the same retryable write atomically
Diffstat (limited to 'src/mongo/db/session_catalog.cpp')
-rw-r--r--src/mongo/db/session_catalog.cpp351
1 files changed, 162 insertions, 189 deletions
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index f9f0648002d..eaf13b6a4fa 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -53,10 +53,10 @@ const auto operationSessionDecoration =
SessionCatalog::~SessionCatalog() {
stdx::lock_guard<Latch> lg(_mutex);
- for (const auto& entry : _sessions) {
- ObservableSession session(lg, entry.second->session);
- invariant(!session.hasCurrentOperation());
- invariant(!session._killed());
+ for (const auto& [_, sri] : _sessions) {
+ ObservableSession osession(lg, sri.get(), &sri->parentSession);
+ invariant(!osession.hasCurrentOperation());
+ invariant(!osession._killed());
}
}
@@ -74,95 +74,39 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) {
return &sessionTransactionTable;
}
-SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionWithParentSession(
+SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionInner(
OperationContext* opCtx, const LogicalSessionId& lsid, boost::optional<KillToken> killToken) {
if (killToken) {
invariant(killToken->lsidToKill == lsid);
- invariant(killToken->parentLsidToKill);
- invariant(*killToken->parentLsidToKill == *getParentSessionId(lsid));
} else {
invariant(opCtx->getLogicalSessionId() == lsid);
}
stdx::unique_lock<Latch> ul(_mutex);
- auto parentSri = _getOrCreateSessionRuntimeInfo(ul, *getParentSessionId(lsid), nullptr);
- auto childSri = _getOrCreateSessionRuntimeInfo(ul, lsid, parentSri);
+ auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid);
+ auto session = sri->getSession(lsid);
+ invariant(session);
if (killToken) {
- invariant(ObservableSession(ul, childSri->session)._killed());
- invariant(ObservableSession(ul, parentSri->session)._killed());
- }
-
- // Wait until the session is no longer checked out and until the previously scheduled kill has
- // completed
- ++parentSri->numWaitingToCheckOut;
- ++childSri->numWaitingToCheckOut;
- ON_BLOCK_EXIT([&] {
- --parentSri->numWaitingToCheckOut;
- --childSri->numWaitingToCheckOut;
- });
-
- // Wait on the parent session's condition variable since if the parent session is checked out
- // prior to this, the child session's condition variable will not be notified when the parent
- // session becomes available; on the other hand, if the child session is checked out prior to
- // this, both parent session's and child session's condition variables will be notified when the
- // child session and parent session become available.
- opCtx->waitForConditionOrInterrupt(
- parentSri->availableCondVar,
- ul,
- [&ul, &opCtx, &parentSri, &childSri, forKill = killToken.has_value()]() {
- ObservableSession oParentSession(ul, parentSri->session);
- ObservableSession oChildSession(ul, childSri->session);
- auto isParentSessionAvailable = oParentSession._isAvailableForCheckOut(forKill);
- auto isChildSessionAvailable = oChildSession._isAvailableForCheckOut(forKill);
- if (isParentSessionAvailable) {
- invariant(isChildSessionAvailable || oChildSession._killed());
- }
- return isParentSessionAvailable && isChildSessionAvailable;
- });
-
- parentSri->session._childSessionCheckoutOpCtx = opCtx;
- parentSri->session._lastCheckout = Date_t::now();
-
- childSri->session._checkoutOpCtx = opCtx;
- childSri->session._lastCheckout = Date_t::now();
-
- return ScopedCheckedOutSession(
- *this, std::move(childSri), std::move(parentSri), std::move(killToken));
-}
-
-SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionWithoutParentSession(
- OperationContext* opCtx, const LogicalSessionId& lsid, boost::optional<KillToken> killToken) {
- if (killToken) {
- invariant(killToken->lsidToKill == lsid);
- invariant(!killToken->parentLsidToKill);
- } else {
- invariant(opCtx->getLogicalSessionId() == lsid);
- }
-
- stdx::unique_lock<Latch> ul(_mutex);
-
- auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid, nullptr);
- if (killToken) {
- invariant(ObservableSession(ul, sri->session)._killed());
+ invariant(ObservableSession(ul, sri, session)._killed());
}
// Wait until the session is no longer checked out and until the previously scheduled kill has
// completed.
- ++sri->numWaitingToCheckOut;
- ON_BLOCK_EXIT([&] { --sri->numWaitingToCheckOut; });
+ ++session->_numWaitingToCheckOut;
+ ON_BLOCK_EXIT([&] { --session->_numWaitingToCheckOut; });
opCtx->waitForConditionOrInterrupt(
- sri->availableCondVar, ul, [&ul, &sri, forKill = killToken.has_value()]() {
- ObservableSession osession(ul, sri->session);
+ sri->availableCondVar, ul, [&ul, &sri, &session, forKill = killToken.has_value()]() {
+ ObservableSession osession(ul, sri, session);
return osession._isAvailableForCheckOut(forKill);
});
- sri->session._checkoutOpCtx = opCtx;
- sri->session._lastCheckout = Date_t::now();
+ sri->checkoutOpCtx = opCtx;
+ sri->lastCheckout = Date_t::now();
- return ScopedCheckedOutSession(*this, std::move(sri), nullptr, std::move(killToken));
+ return ScopedCheckedOutSession(*this, std::move(sri), session, std::move(killToken));
}
SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(OperationContext* opCtx) {
@@ -174,10 +118,7 @@ SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(Operati
invariant(!opCtx->lockState()->isLocked());
auto lsid = *opCtx->getLogicalSessionId();
- if (getParentSessionId(lsid)) {
- return _checkOutSessionWithParentSession(opCtx, lsid, boost::none /* killToken */);
- }
- return _checkOutSessionWithoutParentSession(opCtx, lsid, boost::none /* killToken */);
+ return _checkOutSessionInner(opCtx, lsid, boost::none /* killToken */);
}
SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationContext* opCtx,
@@ -188,58 +129,101 @@ SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationCo
invariant(!opCtx->getTxnNumber());
auto lsid = killToken.lsidToKill;
- if (getParentSessionId(lsid)) {
- return SessionToKill(_checkOutSessionWithParentSession(opCtx, lsid, std::move(killToken)));
- }
- return SessionToKill(_checkOutSessionWithoutParentSession(opCtx, lsid, std::move(killToken)));
+ return SessionToKill(_checkOutSessionInner(opCtx, lsid, std::move(killToken)));
}
void SessionCatalog::scanSession(const LogicalSessionId& lsid,
const ScanSessionsCallbackFn& workerFn) {
- std::unique_ptr<SessionRuntimeInfo> sessionToReap;
+ stdx::lock_guard<Latch> lg(_mutex);
- {
- stdx::lock_guard<Latch> lg(_mutex);
- auto it = _sessions.find(lsid);
- if (it != _sessions.end()) {
- auto& sri = it->second;
- ObservableSession osession(lg, sri->session);
+ if (auto sri = _getSessionRuntimeInfo(lg, lsid)) {
+ auto session = sri->getSession(lsid);
+ invariant(session);
+
+ ObservableSession osession(lg, sri, session);
+ workerFn(osession);
+ invariant(!osession._markedForReap, "Cannot reap a session via 'scanSession'");
+ }
+}
+
+void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
+ const ScanSessionsCallbackFn& workerFn) {
+ stdx::lock_guard<Latch> lg(_mutex);
+
+ LOGV2_DEBUG(21976,
+ 2,
+ "Scanning {sessionCount} sessions",
+ "Scanning sessions",
+ "sessionCount"_attr = _sessions.size());
+
+ for (auto& [parentLsid, sri] : _sessions) {
+ if (matcher.match(parentLsid)) {
+ ObservableSession osession(lg, sri.get(), &sri->parentSession);
workerFn(osession);
+ invariant(!osession._markedForReap, "Cannot reap a session via 'scanSessions'");
+ }
- if (osession._shouldBeReaped(sri->numWaitingToCheckOut)) {
- sessionToReap = std::move(sri);
- _sessions.erase(it);
+ for (auto& [childLsid, session] : sri->childSessions) {
+ if (matcher.match(childLsid)) {
+ ObservableSession osession(lg, sri.get(), &session);
+ workerFn(osession);
+ invariant(!osession._markedForReap, "Cannot reap a session via 'scanSessions'");
}
}
}
}
-void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
- const ScanSessionsCallbackFn& workerFn) {
- std::vector<std::unique_ptr<SessionRuntimeInfo>> sessionsToReap;
+LogicalSessionIdSet SessionCatalog::scanSessionsForReap(
+ const LogicalSessionId& parentLsid,
+ const ScanSessionsCallbackFn& parentSessionWorkerFn,
+ const ScanSessionsCallbackFn& childSessionWorkerFn) {
+ invariant(!getParentSessionId(parentLsid));
+ std::unique_ptr<SessionRuntimeInfo> sriToReap;
{
stdx::lock_guard<Latch> lg(_mutex);
- LOGV2_DEBUG(21976,
- 2,
- "Scanning {sessionCount} sessions",
- "Scanning sessions",
- "sessionCount"_attr = _sessions.size());
+ auto sriIt = _sessions.find(parentLsid);
+ // The reaper should never try to reap a non-existent session id.
+ invariant(sriIt != _sessions.end());
+ auto sri = sriIt->second.get();
- for (auto it = _sessions.begin(); it != _sessions.end(); ++it) {
- if (matcher.match(it->first)) {
- auto& sri = it->second;
- ObservableSession osession(lg, sri->session);
+ LogicalSessionIdSet remainingSessions;
+ bool shouldReapRemaining = true;
- workerFn(osession);
+ {
+ ObservableSession osession(lg, sri, &sri->parentSession);
+ parentSessionWorkerFn(osession);
- if (osession._shouldBeReaped(sri->numWaitingToCheckOut)) {
- sessionsToReap.emplace_back(std::move(sri));
- _sessions.erase(it++);
+ remainingSessions.insert(osession.getSessionId());
+ shouldReapRemaining = osession._shouldBeReaped();
+ }
+
+ {
+ auto childSessionIt = sri->childSessions.begin();
+ while (childSessionIt != sri->childSessions.end()) {
+ ObservableSession osession(lg, sri, &childSessionIt->second);
+ childSessionWorkerFn(osession);
+
+ if (osession._shouldBeReaped() &&
+ (osession._reapMode == ObservableSession::ReapMode::kExclusive)) {
+ sri->childSessions.erase(childSessionIt++);
+ continue;
}
+
+ remainingSessions.insert(osession.getSessionId());
+ shouldReapRemaining &= osession._shouldBeReaped();
+ ++childSessionIt;
}
}
+
+ if (shouldReapRemaining) {
+ sriToReap = std::move(sriIt->second);
+ _sessions.erase(sriIt);
+ remainingSessions.clear();
+ }
+
+ return remainingSessions;
}
}
@@ -248,7 +232,7 @@ SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& ls
auto sri = _getSessionRuntimeInfo(lg, lsid);
uassert(ErrorCodes::NoSuchSession, "Session not found", sri);
- return ObservableSession(lg, sri->session).kill();
+ return ObservableSession(lg, sri, &sri->parentSession).kill();
}
size_t SessionCatalog::size() const {
@@ -258,133 +242,122 @@ size_t SessionCatalog::size() const {
void SessionCatalog::createSessionIfDoesNotExist(const LogicalSessionId& lsid) {
stdx::lock_guard<Latch> lg(_mutex);
- auto parentSri = [&]() -> SessionRuntimeInfo* {
- if (auto parentLsid = getParentSessionId(lsid)) {
- return _getOrCreateSessionRuntimeInfo(lg, *parentLsid, nullptr);
- }
- return nullptr;
- }();
- _getOrCreateSessionRuntimeInfo(lg, lsid, parentSri);
+ _getOrCreateSessionRuntimeInfo(lg, lsid);
}
SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo(
WithLock, const LogicalSessionId& lsid) {
- auto it = _sessions.find(lsid);
- if (it == _sessions.end()) {
+ auto parentLsid = castToParentSessionId(lsid);
+ auto sriIt = _sessions.find(parentLsid);
+
+ if (sriIt == _sessions.end()) {
return nullptr;
}
- return it->second.get();
+
+ auto sri = sriIt->second.get();
+ auto session = sri->getSession(lsid);
+
+ if (session) {
+ return sri;
+ }
+
+ return nullptr;
}
SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeInfo(
- WithLock lk, const LogicalSessionId& lsid, SessionRuntimeInfo* parentSri) {
+ WithLock lk, const LogicalSessionId& lsid) {
if (auto sri = _getSessionRuntimeInfo(lk, lsid)) {
return sri;
}
- auto it = _sessions.emplace(lsid, std::make_unique<SessionRuntimeInfo>(lsid, parentSri)).first;
- return it->second.get();
+ auto parentLsid = castToParentSessionId(lsid);
+ auto sriIt =
+ _sessions.emplace(parentLsid, std::make_unique<SessionRuntimeInfo>(parentLsid)).first;
+ auto sri = sriIt->second.get();
+
+ if (getParentSessionId(lsid)) {
+ auto [childSessionIt, inserted] = sri->childSessions.try_emplace(lsid, lsid);
+ // Insert should always succeed since the session did not exist prior to this.
+ invariant(inserted);
+
+ auto& childSession = childSessionIt->second;
+ childSession._parentSession = &sri->parentSession;
+ }
+
+ return sri;
}
void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
- SessionRuntimeInfo* parentSri,
+ Session* session,
boost::optional<KillToken> killToken) {
stdx::lock_guard<Latch> lg(_mutex);
// Make sure we have exactly the same session on the map and that it is still associated with an
// operation context (meaning checked-out)
- invariant(_sessions[sri->session.getSessionId()].get() == sri);
- invariant(sri->session._checkoutOpCtx);
+ invariant(_sessions[sri->parentSession.getSessionId()].get() == sri);
+ invariant(sri->checkoutOpCtx);
if (killToken) {
- invariant(killToken->lsidToKill == sri->session.getSessionId());
-
- if (parentSri) {
- invariant(killToken->parentLsidToKill == parentSri->session.getSessionId());
- } else {
- invariant(!killToken->parentLsidToKill);
- }
+ invariant(killToken->lsidToKill == session->getSessionId());
}
- auto parentLsid = getParentSessionId(sri->session.getSessionId());
- if (parentSri) {
- invariant(parentLsid);
- invariant(parentSri->session._childSessionCheckoutOpCtx == sri->session._checkoutOpCtx);
- parentSri->session._childSessionCheckoutOpCtx = nullptr;
- parentSri->availableCondVar.notify_all();
- } else {
- invariant(!parentLsid);
- }
- sri->session._checkoutOpCtx = nullptr;
+ sri->checkoutOpCtx = nullptr;
sri->availableCondVar.notify_all();
if (killToken) {
- invariant(sri->session._killsRequested > 0);
- --sri->session._killsRequested;
-
- if (parentSri) {
- invariant(killToken->parentLsidToKill);
- invariant(parentSri->session._killsRequested > 0);
- --parentSri->session._killsRequested;
- }
+ invariant(sri->killsRequested > 0);
+ --sri->killsRequested;
}
}
-SessionCatalog::SessionRuntimeInfo::~SessionRuntimeInfo() {
- invariant(!numWaitingToCheckOut);
+Session* SessionCatalog::SessionRuntimeInfo::getSession(const LogicalSessionId& lsid) {
+ if (lsid == parentSession._sessionId) {
+ return &parentSession;
+ }
+
+ invariant(getParentSessionId(lsid) == parentSession._sessionId);
+ auto it = childSessions.find(lsid);
+ if (it == childSessions.end()) {
+ return nullptr;
+ }
+ return &it->second;
}
SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) const {
- const bool firstKiller = (0 == _session->_killsRequested);
- ++_session->_killsRequested;
+ const bool firstKiller = (0 == _sri->killsRequested);
+ ++_sri->killsRequested;
// For currently checked-out sessions, interrupt the operation context so that the current owner
// can release the session
if (firstKiller && hasCurrentOperation()) {
- if (_session->_checkoutOpCtx) {
- invariant(_clientLock.owns_lock());
- invariant(!_session->_childSessionCheckoutOpCtx);
- const auto serviceContext = _session->_checkoutOpCtx->getServiceContext();
- serviceContext->killOperation(_clientLock, _session->_checkoutOpCtx, reason);
- } else if (_session->_childSessionCheckoutOpCtx) {
- // Both parent and child sessions can't be checked out at the same time, so _clientLock
- // should be empty, and we'll never take the child operation context's client lock while
- // already holding the parent's.
- invariant(!_clientLock.owns_lock());
- stdx::unique_lock<Client> childOpClientLock{
- *_session->_childSessionCheckoutOpCtx->getClient()};
- const auto serviceContext = _session->_childSessionCheckoutOpCtx->getServiceContext();
- serviceContext->killOperation(
- childOpClientLock, _session->_childSessionCheckoutOpCtx, reason);
- }
- }
-
- auto parentSession = _session->_parentSession;
- if (parentSession) {
- const bool firstParentKiller = (0 == parentSession->_killsRequested);
- ++parentSession->_killsRequested;
-
- if (firstParentKiller && parentSession->_checkoutOpCtx) {
- // Both parent and child sessions can't be checked out at the same time, so _clientLock
- // should be empty, and we'll never take the parent operation context's client lock
- // while already holding the child's.
- invariant(!_clientLock.owns_lock());
- stdx::unique_lock<Client> clientLock{*parentSession->_checkoutOpCtx->getClient()};
- const auto serviceContext = parentSession->_checkoutOpCtx->getServiceContext();
- serviceContext->killOperation(clientLock, parentSession->_checkoutOpCtx, reason);
- }
+ invariant(_clientLock.owns_lock());
+ const auto serviceContext = _sri->checkoutOpCtx->getServiceContext();
+ serviceContext->killOperation(_clientLock, _sri->checkoutOpCtx, reason);
}
- return SessionCatalog::KillToken(
- getSessionId(),
- parentSession ? boost::make_optional(parentSession->getSessionId()) : boost::none);
+ return SessionCatalog::KillToken(getSessionId());
}
-void ObservableSession::markForReap() {
+void ObservableSession::markForReap(ReapMode reapMode) {
+ if (!getParentSessionId(getSessionId())) {
+ // By design, parent sessions are only safe to be reaped if all of their child sessions are.
+ invariant(reapMode == ReapMode::kNonExclusive);
+ }
_markedForReap = true;
+ _reapMode.emplace(reapMode);
+}
+
+bool ObservableSession::_shouldBeReaped() const {
+ bool isCheckedOut = [&] {
+ if (_sri->checkoutOpCtx) {
+ return _sri->checkoutOpCtx->getLogicalSessionId() == getSessionId();
+ }
+ return false;
+ }();
+ return _markedForReap && !isCheckedOut && !get()->_numWaitingToCheckOut && !_killed();
}
bool ObservableSession::_killed() const {
- return _session->_killsRequested > 0;
+ return _sri->killsRequested > 0;
}
OperationContextSession::OperationContextSession(OperationContext* opCtx) : _opCtx(opCtx) {