From 632105d33ab6931e86626db08160800d41bb1329 Mon Sep 17 00:00:00 2001 From: Cheahuychou Mao Date: Fri, 30 Jul 2021 06:51:46 +0000 Subject: SERVER-58751 Support internal sessions --- jstests/sharding/internal_sessions.js | 74 ++ src/mongo/db/SConscript | 1 + src/mongo/db/logical_session_id.cpp | 17 + src/mongo/db/logical_session_id.h | 13 +- src/mongo/db/logical_session_id.idl | 20 + src/mongo/db/logical_session_id_helpers.cpp | 14 +- src/mongo/db/logical_session_id_helpers.h | 5 + .../process_interface/mongos_process_interface.cpp | 2 +- src/mongo/db/session.h | 12 +- src/mongo/db/session_catalog.cpp | 183 +++-- src/mongo/db/session_catalog.h | 71 +- src/mongo/db/session_catalog_test.cpp | 765 ++++++++++++++------- 12 files changed, 865 insertions(+), 312 deletions(-) create mode 100644 jstests/sharding/internal_sessions.js diff --git a/jstests/sharding/internal_sessions.js b/jstests/sharding/internal_sessions.js new file mode 100644 index 00000000000..f51a124890a --- /dev/null +++ b/jstests/sharding/internal_sessions.js @@ -0,0 +1,74 @@ +/* + * Tests basic support for internal sessions. + * + * @tags: [requires_fcv_51] + */ +(function() { +'use strict'; + +TestData.disableImplicitSessions = true; + +const st = new ShardingTest({shards: 1}); +const shard0Primary = st.rs0.getPrimary(); + +const kDbName = "testDb"; +const kCollName = "testColl"; +const testDB = st.s.getDB(kDbName); + +const kConfigTxnNs = "config.transactions"; + +assert.commandWorked(st.s.adminCommand({enableSharding: kDbName})); +st.ensurePrimaryShard(kDbName, st.shard0.shardName); + +// Verify that parent and child sessions are tracked using different config.transactions documents. +const sessionUUID = UUID(); + +const lsid0 = { + id: sessionUUID +}; +assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 0}], + ordered: false, + lsid: lsid0, + txnNumber: NumberLong(0) +})); +assert.neq(null, shard0Primary.getCollection(kConfigTxnNs).findOne({"_id.id": sessionUUID})); + +const lsid1 = { + id: sessionUUID, + txnNumber: NumberLong(35), + stmtId: NumberInt(0) +}; +assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 1}], + ordered: false, + lsid: lsid1, + txnNumber: NumberLong(0) +})); +assert.neq(null, shard0Primary.getCollection(kConfigTxnNs).findOne({ + "_id.id": sessionUUID, + "_id.txnNumber": lsid1.txnNumber, + "_id.stmtId": lsid1.stmtId +})); + +const lsid2 = { + id: sessionUUID, + txnUUID: UUID() +}; +assert.commandWorked(testDB.runCommand({ + insert: kCollName, + documents: [{x: 2}], + ordered: false, + lsid: lsid2, + txnNumber: NumberLong(35) +})); +assert.neq(null, + shard0Primary.getCollection(kConfigTxnNs) + .findOne({"_id.id": sessionUUID, "_id.txnUUID": lsid2.txnUUID})); + +assert.eq(3, shard0Primary.getCollection(kConfigTxnNs).count({"_id.id": sessionUUID})); + +st.stop(); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 6c1fd4eb364..f6296a79e5c 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -649,6 +649,7 @@ env.Library( LIBDEPS=[ 'kill_sessions', 'logical_session_id', + 'logical_session_id_helpers', ], LIBDEPS_PRIVATE=[ 'service_context', diff --git a/src/mongo/db/logical_session_id.cpp b/src/mongo/db/logical_session_id.cpp index 64e67b7a1a7..964d562bce5 100644 --- a/src/mongo/db/logical_session_id.cpp +++ b/src/mongo/db/logical_session_id.cpp @@ -42,6 +42,23 @@ LogicalSessionId makeLogicalSessionIdForTest() { return lsid; } +LogicalSessionId makeLogicalSessionIdWithTxnNumberForTest( + boost::optional parentLsid, boost::optional stmtId) { + auto lsid = parentLsid ? LogicalSessionId(parentLsid->getId(), parentLsid->getUid()) + : makeLogicalSessionIdForTest(); + lsid.getInternalSessionFields().setTxnNumber(0); + lsid.getInternalSessionFields().setStmtId(stmtId ? *stmtId : 0); + return lsid; +} + +LogicalSessionId makeLogicalSessionIdWithTxnUUIDForTest( + boost::optional parentLsid) { + auto lsid = parentLsid ? LogicalSessionId(parentLsid->getId(), parentLsid->getUid()) + : makeLogicalSessionIdForTest(); + lsid.getInternalSessionFields().setTxnUUID(UUID::gen()); + return lsid; +} + LogicalSessionRecord makeLogicalSessionRecordForTest() { LogicalSessionRecord record{}; diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h index 781f2e13ecb..01d74fa7cc4 100644 --- a/src/mongo/db/logical_session_id.h +++ b/src/mongo/db/logical_session_id.h @@ -58,9 +58,9 @@ constexpr Minutes kLogicalSessionDefaultTimeout = Minutes(kLocalLogicalSessionTimeoutMinutesDefault); inline bool operator==(const LogicalSessionId& lhs, const LogicalSessionId& rhs) { - auto makeEqualityLens = [](const auto& lsid) { return std::tie(lsid.getId(), lsid.getUid()); }; - - return makeEqualityLens(lhs) == makeEqualityLens(rhs); + return (lhs.getId() == rhs.getId()) && (lhs.getUid() == rhs.getUid()) && + (lhs.getTxnNumber() == rhs.getTxnNumber()) && (lhs.getStmtId() == rhs.getStmtId()) && + (lhs.getTxnUUID() == rhs.getTxnUUID()); } inline bool operator!=(const LogicalSessionId& lhs, const LogicalSessionId& rhs) { @@ -77,6 +77,13 @@ inline bool operator!=(const LogicalSessionRecord& lhs, const LogicalSessionReco LogicalSessionId makeLogicalSessionIdForTest(); +LogicalSessionId makeLogicalSessionIdWithTxnNumberForTest( + boost::optional parentLsid = boost::none, + boost::optional stmtId = boost::none); + +LogicalSessionId makeLogicalSessionIdWithTxnUUIDForTest( + boost::optional parentLsid = boost::none); + LogicalSessionRecord makeLogicalSessionRecordForTest(); struct LogicalSessionIdHash { diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl index 50bd156f130..1f356d5f046 100644 --- a/src/mongo/db/logical_session_id.idl +++ b/src/mongo/db/logical_session_id.idl @@ -56,10 +56,28 @@ types: deserializer: "mongo::BSONElement::_numberInt" structs: + InternalSessionFields: + description: "Internal sessiond id fields" + strict: true + fields: + txnNumber: + description: "Used for internal sessions only." + type: TxnNumber + optional: true + stmtId: + description: "Used for internal sessions only." + type: StmtId + optional: true + txnUUID: + description: "Used for internal sessions only." + type: uuid + optional: true LogicalSessionId: description: "A struct representing a LogicalSessionId" strict: true + chained_structs: + InternalSessionFields: InternalSessionFields fields: id: uuid uid: sha256Block @@ -92,6 +110,8 @@ structs: LogicalSessionFromClient: description: "A struct representing a LogicalSessionId from external clients" strict: true + chained_structs: + InternalSessionFields: InternalSessionFields fields: id: uuid uid: diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp index 7577494b4a9..6ec83fe10bc 100644 --- a/src/mongo/db/logical_session_id_helpers.cpp +++ b/src/mongo/db/logical_session_id_helpers.cpp @@ -75,12 +75,22 @@ SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db) { return SHA256Block::computeHash({ConstDataRange(fn.c_str(), fn.size())}); } +boost::optional getParentSessionId(const LogicalSessionId& sessionId) { + if (sessionId.getTxnNumber() || sessionId.getTxnUUID()) { + return LogicalSessionId{sessionId.getId(), sessionId.getUid()}; + } + return boost::none; +} + LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& fromClient, OperationContext* opCtx, std::initializer_list allowSpoof) { LogicalSessionId lsid; lsid.setId(fromClient.getId()); + lsid.getInternalSessionFields().setTxnNumber(fromClient.getTxnNumber()); + lsid.getInternalSessionFields().setStmtId(fromClient.getStmtId()); + lsid.getInternalSessionFields().setTxnUUID(fromClient.getTxnUUID()); if (fromClient.getUid()) { auto authSession = AuthorizationSession::get(opCtx->getClient()); @@ -203,9 +213,7 @@ namespace logical_session_id_helpers { void serializeLsidAndTxnNumber(OperationContext* opCtx, BSONObjBuilder* builder) { OperationSessionInfo sessionInfo; - if (opCtx->getLogicalSessionId()) { - sessionInfo.setSessionId(*opCtx->getLogicalSessionId()); - } + sessionInfo.setSessionId(opCtx->getLogicalSessionId()); sessionInfo.setTxnNumber(opCtx->getTxnNumber()); sessionInfo.serialize(builder); } diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h index 920a8764864..f2c01603cec 100644 --- a/src/mongo/db/logical_session_id_helpers.h +++ b/src/mongo/db/logical_session_id_helpers.h @@ -49,6 +49,11 @@ SHA256Block getLogicalSessionUserDigestForLoggedInUser(const OperationContext* o */ SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db); +/** + * Returns the parent session id for the given session id if there is one. + */ +boost::optional getParentSessionId(const LogicalSessionId& sessionId); + /** * Factory functions to generate logical session records. */ diff --git a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp index 1e42994af7f..35c6febec29 100644 --- a/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp +++ b/src/mongo/db/pipeline/process_interface/mongos_process_interface.cpp @@ -260,7 +260,7 @@ void MongosProcessInterface::_reportCurrentOpsForIdleSessions(OperationContext* : KillAllSessionsByPatternSet{{}}); sessionCatalog->scanSessions({std::move(sessionFilter)}, [&](const ObservableSession& session) { - if (!session.currentOperation()) { + if (!session.hasCurrentOperation()) { auto op = TransactionRouter::get(session).reportState(opCtx, false /* sessionIsActive */); if (!op.isEmpty()) { diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h index b26eb7cdb19..050e74822cc 100644 --- a/src/mongo/db/session.h +++ b/src/mongo/db/session.h @@ -65,13 +65,19 @@ private: // The id of the session with which this object is associated const LogicalSessionId _sessionId; + // These fields are only safe to read or write while holding the SessionCatalog::_mutex. In + // practice, it is only used inside of the SessionCatalog itself. + // A pointer back to the currently running operation on this Session, or nullptr if there // is no operation currently running for the Session. - // - // This field is only safe to read or write while holding the SessionCatalog::_mutex. In - // practice, it is only used inside of the SessionCatalog itself. OperationContext* _checkoutOpCtx{nullptr}; + // A pointer to the operation currently running on one of the child Sessions of this Session, + // or nullptr if this is Session does not have any child Session or if there is no operation + // currently running on any of its child Sessions. Used to block this Session and other child + // Sessions from being checked out if there is already a checked-out child Session. + OperationContext* _childSessionCheckoutOpCtx{nullptr}; + // Keeps the last time this session was checked-out Date_t _lastCheckout{Date_t::now()}; diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 22cbd7c01d9..581d65aa27c 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -52,7 +52,7 @@ SessionCatalog::~SessionCatalog() { stdx::lock_guard lg(_mutex); for (const auto& entry : _sessions) { ObservableSession session(lg, entry.second->session); - invariant(!session.currentOperation()); + invariant(!session.hasCurrentOperation()); invariant(!session._killed()); } } @@ -71,59 +71,120 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) { return &sessionTransactionTable; } -SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(OperationContext* opCtx) { - // This method is not supposed to be called with an already checked-out session due to risk of - // deadlock - invariant(opCtx->getLogicalSessionId()); - invariant(!operationSessionDecoration(opCtx)); - invariant(!opCtx->lockState()->inAWriteUnitOfWork()); - invariant(!opCtx->lockState()->isLocked()); +SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionWithParentSession( + OperationContext* opCtx, const LogicalSessionId& lsid, boost::optional killToken) { + if (killToken) { + invariant(killToken->lsidToKill == lsid); + } else { + invariant(opCtx->getLogicalSessionId() == lsid); + } stdx::unique_lock ul(_mutex); - auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, *opCtx->getLogicalSessionId()); + auto parentSri = _getOrCreateSessionRuntimeInfo(ul, *getParentSessionId(lsid)); + auto childSri = _getOrCreateSessionRuntimeInfo(ul, lsid); + + if (killToken) { + invariant(ObservableSession(ul, childSri->session)._killed()); + } // Wait until the session is no longer checked out and until the previously scheduled kill has // completed - ++sri->numWaitingToCheckOut; - ON_BLOCK_EXIT([&] { --sri->numWaitingToCheckOut; }); - - opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri]() { - ObservableSession osession(ul, sri->session); - return !osession.currentOperation() && !osession._killed(); + ++parentSri->numWaitingToCheckOut; + ++childSri->numWaitingToCheckOut; + ON_BLOCK_EXIT([&] { + --parentSri->numWaitingToCheckOut; + --childSri->numWaitingToCheckOut; }); - sri->session._checkoutOpCtx = opCtx; - sri->session._lastCheckout = Date_t::now(); + // Wait on the parent session's condition variable since if the parent session is checked out + // prior to this, the child session's condition variable will not be notified when the parent + // session becomes available; on the other hand, if the child session is checked out prior to + // this, both parent session's and child session's condition variables will be notified when the + // child session and parent session become available. + opCtx->waitForConditionOrInterrupt( + parentSri->availableCondVar, + ul, + [&ul, &opCtx, &parentSri, &childSri, forKill = killToken.has_value()]() { + ObservableSession oParentSession(ul, parentSri->session); + ObservableSession oChildSession(ul, childSri->session); + auto isParentSessionAvailable = oParentSession._isAvailableForCheckOut(forKill); + auto isChildSessionAvailable = oChildSession._isAvailableForCheckOut(forKill); + if (isParentSessionAvailable) { + invariant(isChildSessionAvailable || oChildSession._killed()); + } + return isParentSessionAvailable && isChildSessionAvailable; + }); + + parentSri->session._childSessionCheckoutOpCtx = opCtx; + parentSri->session._lastCheckout = Date_t::now(); + + childSri->session._checkoutOpCtx = opCtx; + childSri->session._lastCheckout = Date_t::now(); return ScopedCheckedOutSession( - *this, std::move(sri), boost::none /* Not checked out for kill */); + *this, std::move(childSri), std::move(parentSri), std::move(killToken)); } -SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationContext* opCtx, - KillToken killToken) { - // This method is not supposed to be called with an already checked-out session due to risk of - // deadlock - invariant(!operationSessionDecoration(opCtx)); - invariant(!opCtx->getTxnNumber()); +SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionWithoutParentSession( + OperationContext* opCtx, const LogicalSessionId& lsid, boost::optional killToken) { + if (killToken) { + invariant(killToken->lsidToKill == lsid); + } else { + invariant(opCtx->getLogicalSessionId() == lsid); + } stdx::unique_lock ul(_mutex); - auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, killToken.lsidToKill); - invariant(ObservableSession(ul, sri->session)._killed()); - // Wait until the session is no longer checked out + auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid); + if (killToken) { + invariant(ObservableSession(ul, sri->session)._killed()); + } + + // Wait until the session is no longer checked out and until the previously scheduled kill has + // completed. ++sri->numWaitingToCheckOut; ON_BLOCK_EXIT([&] { --sri->numWaitingToCheckOut; }); - opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri] { - ObservableSession osession(ul, sri->session); - return !osession.currentOperation(); - }); + opCtx->waitForConditionOrInterrupt( + sri->availableCondVar, ul, [&ul, &sri, forKill = killToken.has_value()]() { + ObservableSession osession(ul, sri->session); + return osession._isAvailableForCheckOut(forKill); + }); sri->session._checkoutOpCtx = opCtx; sri->session._lastCheckout = Date_t::now(); - return SessionToKill(ScopedCheckedOutSession(*this, std::move(sri), std::move(killToken))); + return ScopedCheckedOutSession(*this, std::move(sri), nullptr, std::move(killToken)); +} + +SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(OperationContext* opCtx) { + // This method is not supposed to be called with an already checked-out session due to risk of + // deadlock + invariant(opCtx->getLogicalSessionId()); + invariant(!operationSessionDecoration(opCtx)); + invariant(!opCtx->lockState()->inAWriteUnitOfWork()); + invariant(!opCtx->lockState()->isLocked()); + + auto lsid = *opCtx->getLogicalSessionId(); + if (getParentSessionId(lsid)) { + return _checkOutSessionWithParentSession(opCtx, lsid, boost::none /* killToken */); + } + return _checkOutSessionWithoutParentSession(opCtx, lsid, boost::none /* killToken */); +} + +SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationContext* opCtx, + KillToken killToken) { + // This method is not supposed to be called with an already checked-out session due to risk of + // deadlock + invariant(!operationSessionDecoration(opCtx)); + invariant(!opCtx->getTxnNumber()); + + auto lsid = killToken.lsidToKill; + if (getParentSessionId(lsid)) { + return SessionToKill(_checkOutSessionWithParentSession(opCtx, lsid, std::move(killToken))); + } + return SessionToKill(_checkOutSessionWithoutParentSession(opCtx, lsid, std::move(killToken))); } void SessionCatalog::scanSession(const LogicalSessionId& lsid, @@ -138,8 +199,7 @@ void SessionCatalog::scanSession(const LogicalSessionId& lsid, ObservableSession osession(lg, sri->session); workerFn(osession); - if (osession._markedForReap && !osession._killed() && !osession.currentOperation() && - !sri->numWaitingToCheckOut) { + if (osession._shouldBeReaped(sri->numWaitingToCheckOut)) { sessionToReap = std::move(sri); _sessions.erase(it); } @@ -167,8 +227,7 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, workerFn(osession); - if (osession._markedForReap && !osession._killed() && - !osession.currentOperation() && !sri->numWaitingToCheckOut) { + if (osession._shouldBeReaped(sri->numWaitingToCheckOut)) { sessionsToReap.emplace_back(std::move(sri)); _sessions.erase(it++); } @@ -179,10 +238,9 @@ void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& lsid) { stdx::lock_guard lg(_mutex); - auto it = _sessions.find(lsid); - uassert(ErrorCodes::NoSuchSession, "Session not found", it != _sessions.end()); - auto& sri = it->second; + auto sri = _getSessionRuntimeInfo(lg, lsid); + uassert(ErrorCodes::NoSuchSession, "Session not found", sri); return ObservableSession(lg, sri->session).kill(); } @@ -191,17 +249,27 @@ size_t SessionCatalog::size() const { return _sessions.size(); } -SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeInfo( - WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) { +SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo( + WithLock, const LogicalSessionId& lsid) { auto it = _sessions.find(lsid); if (it == _sessions.end()) { - it = _sessions.emplace(lsid, std::make_unique(lsid)).first; + return nullptr; + } + return it->second.get(); +} + +SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeInfo( + WithLock lk, const LogicalSessionId& lsid) { + if (auto sri = _getSessionRuntimeInfo(lk, lsid)) { + return sri; } + auto it = _sessions.emplace(lsid, std::make_unique(lsid)).first; return it->second.get(); } void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, + SessionRuntimeInfo* parentSri, boost::optional killToken) { stdx::lock_guard lg(_mutex); @@ -209,6 +277,19 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, // operation context (meaning checked-out) invariant(_sessions[sri->session.getSessionId()].get() == sri); invariant(sri->session._checkoutOpCtx); + if (killToken) { + invariant(killToken->lsidToKill == sri->session.getSessionId()); + } + + auto parentLsid = getParentSessionId(sri->session.getSessionId()); + if (parentSri) { + invariant(parentLsid); + invariant(parentSri->session._childSessionCheckoutOpCtx == sri->session._checkoutOpCtx); + parentSri->session._childSessionCheckoutOpCtx = nullptr; + parentSri->availableCondVar.notify_all(); + } else { + invariant(!parentLsid); + } sri->session._checkoutOpCtx = nullptr; sri->availableCondVar.notify_all(); @@ -228,10 +309,20 @@ SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) cons // For currently checked-out sessions, interrupt the operation context so that the current owner // can release the session - if (firstKiller && _session->_checkoutOpCtx) { - invariant(_clientLock); - const auto serviceContext = _session->_checkoutOpCtx->getServiceContext(); - serviceContext->killOperation(_clientLock, _session->_checkoutOpCtx, reason); + if (firstKiller && hasCurrentOperation()) { + if (_session->_checkoutOpCtx) { + invariant(_clientLock); + const auto serviceContext = _session->_checkoutOpCtx->getServiceContext(); + serviceContext->killOperation(_clientLock, _session->_checkoutOpCtx, reason); + } + if (_session->_childSessionCheckoutOpCtx) { + // TODO (SERVER-58755): Test killing a session while its child session is being checked + // out, and after its child session has been checked out. + stdx::unique_lock clientLock{ + *_session->_childSessionCheckoutOpCtx->getClient()}; + const auto serviceContext = _session->_childSessionCheckoutOpCtx->getServiceContext(); + serviceContext->killOperation(clientLock, _session->_childSessionCheckoutOpCtx, reason); + } } return SessionCatalog::KillToken(getSessionId()); diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index 4c8a550cdae..7bed6ae91a3 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -34,6 +34,7 @@ #include "mongo/db/client.h" #include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" #include "mongo/db/session.h" #include "mongo/db/session_killer.h" @@ -135,23 +136,40 @@ private: }; using SessionRuntimeInfoMap = LogicalSessionIdMap>; + /** + * Blocking method, which checks-out the session with the given 'lsid'. + */ + ScopedCheckedOutSession _checkOutSessionWithParentSession(OperationContext* opCtx, + const LogicalSessionId& lsid, + boost::optional killToken); + ScopedCheckedOutSession _checkOutSessionWithoutParentSession( + OperationContext* opCtx, + const LogicalSessionId& lsid, + boost::optional killToken); + /** * Blocking method, which checks-out the session set on 'opCtx'. */ ScopedCheckedOutSession _checkOutSession(OperationContext* opCtx); /** - * Creates or returns the session runtime info for 'lsid' from the '_sessions' map. The returned - * pointer is guaranteed to be linked on the map for as long as the mutex is held. + * Returns the session runtime info for 'lsid' from the '_sessions' map. The returned pointer + * is guaranteed to be linked on the map for as long as the mutex is held. */ - SessionRuntimeInfo* _getOrCreateSessionRuntimeInfo(WithLock, - OperationContext* opCtx, - const LogicalSessionId& lsid); + SessionRuntimeInfo* _getSessionRuntimeInfo(WithLock lk, const LogicalSessionId& lsid); + + /** + * Creates or returns the session runtime info for 'lsid' from the '_sessions' map. The + * returned pointer is guaranteed to be linked on the map for as long as the mutex is held. + */ + SessionRuntimeInfo* _getOrCreateSessionRuntimeInfo(WithLock lk, const LogicalSessionId& lsid); /** * Makes a session, previously checked out through 'checkoutSession', available again. */ - void _releaseSession(SessionRuntimeInfo* sri, boost::optional killToken); + void _releaseSession(SessionRuntimeInfo* sri, + SessionRuntimeInfo* parentSri, + boost::optional killToken); // Protects the state below mutable Mutex _mutex = @@ -169,12 +187,25 @@ class SessionCatalog::ScopedCheckedOutSession { public: ScopedCheckedOutSession(SessionCatalog& catalog, SessionCatalog::SessionRuntimeInfo* sri, + SessionCatalog::SessionRuntimeInfo* parentSri, boost::optional killToken) - : _catalog(catalog), _sri(sri), _killToken(std::move(killToken)) {} + : _catalog(catalog), _sri(sri), _parentSri(parentSri), _killToken(std::move(killToken)) { + if (_parentSri) { + invariant(getParentSessionId(_sri->session.getSessionId()) == + _parentSri->session.getSessionId()); + } + if (_killToken) { + invariant(_sri->session.getSessionId() == _killToken->lsidToKill); + } + } ScopedCheckedOutSession(ScopedCheckedOutSession&& other) - : _catalog(other._catalog), _sri(other._sri), _killToken(std::move(other._killToken)) { + : _catalog(other._catalog), + _sri(other._sri), + _parentSri(other._parentSri), + _killToken(std::move(other._killToken)) { other._sri = nullptr; + other._parentSri = nullptr; } ScopedCheckedOutSession& operator=(ScopedCheckedOutSession&&) = delete; @@ -183,7 +214,7 @@ public: ~ScopedCheckedOutSession() { if (_sri) { - _catalog._releaseSession(_sri, std::move(_killToken)); + _catalog._releaseSession(_sri, _parentSri, std::move(_killToken)); } } @@ -204,6 +235,7 @@ private: SessionCatalog& _catalog; SessionCatalog::SessionRuntimeInfo* _sri; + SessionCatalog::SessionRuntimeInfo* _parentSri; boost::optional _killToken; }; @@ -260,11 +292,10 @@ public: } /** - * Returns a pointer to the current operation running on this Session, or nullptr if there is no - * operation currently running on this Session. + * Returns true if there is an operation currently running on this Session. */ - OperationContext* currentOperation() const { - return _session->_checkoutOpCtx; + bool hasCurrentOperation() const { + return _session->_checkoutOpCtx || _session->_childSessionCheckoutOpCtx; } /** @@ -324,6 +355,20 @@ private: */ bool _killed() const; + /** + * Returns true if this Session can be checked out. + */ + bool _isAvailableForCheckOut(bool forKill) const { + return !hasCurrentOperation() && (forKill || !_killed()); + } + + /** + * Returns true if this Session should be be deleted from the map. + */ + bool _shouldBeReaped(int numWaitingToCheckOut) const { + return _markedForReap && !_killed() && !hasCurrentOperation() && !numWaitingToCheckOut; + } + Session* _session; stdx::unique_lock _clientLock; bool _markedForReap{false}; diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index a83e6d4c240..a89ee2561c1 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -27,16 +27,21 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + #include "mongo/platform/basic.h" #include +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/db/session_catalog.h" +#include "mongo/logv2/log.h" #include "mongo/stdx/future.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -81,6 +86,107 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, CheckoutAndReleaseSession) { ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); } +TEST_F(SessionCatalogTestWithDefaultOpCtx, CheckoutAndReleaseSessionWithTxnNumber) { + auto parentLsid = makeLogicalSessionIdForTest(); + auto childLsid = makeLogicalSessionIdWithTxnNumberForTest(parentLsid); + _opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(_opCtx); + + auto session = OperationContextSession::get(_opCtx); + ASSERT(session); + ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, CheckoutAndReleaseSessionWithTxnUUID) { + auto parentLsid = makeLogicalSessionIdForTest(); + auto childLsid = makeLogicalSessionIdWithTxnNumberForTest(parentLsid); + _opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(_opCtx); + + auto session = OperationContextSession::get(_opCtx); + ASSERT(session); + ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, CannotCheckOutParentSessionOfCheckedOutSession) { + auto runTest = [&](const LogicalSessionId& parentLsid, const LogicalSessionId& childLsid) { + _opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(_opCtx); + + // Verify that the parent session cannot be checked out until the child session is checked + // back in. + auto future = stdx::async(stdx::launch::async, [this, parentLsid] { + ThreadClient tc(getServiceContext()); + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(parentLsid); + OperationContextSession ocs(opCtx.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); + future.get(); + }; + + auto parentLsid = makeLogicalSessionIdForTest(); + runTest(parentLsid, makeLogicalSessionIdWithTxnNumberForTest(parentLsid)); + runTest(parentLsid, makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, CannotCheckOutChildSessionOfCheckedOutSession) { + auto runTest = [&](const LogicalSessionId& parentLsid, const LogicalSessionId& childLsid) { + _opCtx->setLogicalSessionId(parentLsid); + OperationContextSession ocs(_opCtx); + + // Verify that the child session cannot be checked out until the parent session is checked + // back in. + auto future = stdx::async(stdx::launch::async, [this, childLsid] { + ThreadClient tc(getServiceContext()); + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(opCtx.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); + future.get(); + }; + + auto parentLsid = makeLogicalSessionIdForTest(); + runTest(parentLsid, makeLogicalSessionIdWithTxnNumberForTest(parentLsid)); + runTest(parentLsid, makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, CannotCheckoutMultipleChildSessionsConcurrently) { + auto runTest = [&](const LogicalSessionId& childLsid0, const LogicalSessionId& childLsid1) { + _opCtx->setLogicalSessionId(childLsid0); + OperationContextSession ocs(_opCtx); + + // Verify that another child session cannot be checked out until both the child session + // above and the parent session are checked back in. + auto future = stdx::async(stdx::launch::async, [this, childLsid1] { + ThreadClient tc(getServiceContext()); + auto childSessionOpCtx1 = cc().makeOperationContext(); + childSessionOpCtx1->setLogicalSessionId(childLsid1); + OperationContextSession ocs(childSessionOpCtx1.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); + future.get(); + }; + + auto parentLsid = makeLogicalSessionIdForTest(); + runTest(makeLogicalSessionIdWithTxnNumberForTest(parentLsid, 0), + makeLogicalSessionIdWithTxnNumberForTest(parentLsid, 1)); + runTest(makeLogicalSessionIdWithTxnUUIDForTest(parentLsid), + makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); + runTest(makeLogicalSessionIdWithTxnNumberForTest(parentLsid), + makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); +} + TEST_F(SessionCatalogTestWithDefaultOpCtx, OperationContextCheckedOutSession) { _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); const TxnNumber txnNum = 20; @@ -93,36 +199,46 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, OperationContextCheckedOutSession) { } TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) { - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); - - { - OperationContextSession outerScopedSession(_opCtx); + auto runTest = [&](const LogicalSessionId& lsid) { + _opCtx->setLogicalSessionId(lsid); { - DirectClientSetter inDirectClient(_opCtx); - OperationContextSession innerScopedSession(_opCtx); - - auto session = OperationContextSession::get(_opCtx); - ASSERT(session); - ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); + OperationContextSession outerScopedSession(_opCtx); + + { + DirectClientSetter inDirectClient(_opCtx); + OperationContextSession innerScopedSession(_opCtx); + + auto session = OperationContextSession::get(_opCtx); + ASSERT(session); + ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); + } + + { + DirectClientSetter inDirectClient(_opCtx); + auto session = OperationContextSession::get(_opCtx); + ASSERT(session); + ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); + } } - { - DirectClientSetter inDirectClient(_opCtx); - auto session = OperationContextSession::get(_opCtx); - ASSERT(session); - ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); - } - } + ASSERT(!OperationContextSession::get(_opCtx)); + }; - ASSERT(!OperationContextSession::get(_opCtx)); + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTest, ScanSession) { - // Create three sessions in the catalog. - const std::vector lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(lsid1); + auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); + return {lsid0, lsid1, lsid2, lsid3}; + }(); for (const auto& lsid : lsids) { stdx::async(stdx::launch::async, [this, lsid] { @@ -146,16 +262,24 @@ TEST_F(SessionCatalogTest, ScanSession) { ASSERT_EQ(lsids[2], session.get()->getSessionId()); }); + catalog()->scanSession(lsids[3], [&lsids](const ObservableSession& session) { + ASSERT_EQ(lsids[3], session.get()->getSessionId()); + }); + catalog()->scanSession(makeLogicalSessionIdForTest(), [](const ObservableSession&) { FAIL("The callback was called for non-existent session"); }); } TEST_F(SessionCatalogTest, ScanSessionMarkForReapWhenSessionIsIdle) { - // Create three sessions in the catalog. - const std::vector lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(lsid1); + auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); + return {lsid0, lsid1, lsid2, lsid3}; + }(); for (const auto& lsid : lsids) { stdx::async(stdx::launch::async, [this, lsid] { @@ -178,8 +302,15 @@ TEST_F(SessionCatalogTest, ScanSessionMarkForReapWhenSessionIsIdle) { ASSERT_EQ(lsids[1], session.get()->getSessionId()); }); - catalog()->scanSession(lsids[2], [&lsids](const ObservableSession& session) { - ASSERT_EQ(lsids[2], session.get()->getSessionId()); + catalog()->scanSession(lsids[2], + [&lsids](ObservableSession& session) { session.markForReap(); }); + + catalog()->scanSession(lsids[2], [](const ObservableSession&) { + FAIL("The callback was called for non-existent session"); + }); + + catalog()->scanSession(lsids[3], [&lsids](const ObservableSession& session) { + ASSERT_EQ(lsids[3], session.get()->getSessionId()); }); } @@ -196,10 +327,14 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { ASSERT(lsidsFound.empty()); lsidsFound.clear(); - // Create three sessions in the catalog. - const std::vector lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(lsid1); + auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); + return {lsid0, lsid1, lsid2, lsid3}; + }(); for (const auto& lsid : lsids) { stdx::async(stdx::launch::async, [this, lsid] { @@ -213,7 +348,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { // Scan over all Sessions. catalog()->scanSessions(matcherAllSessions, workerFn); - ASSERT_EQ(3U, lsidsFound.size()); + ASSERT_EQ(4U, lsidsFound.size()); lsidsFound.clear(); // Scan over all Sessions, visiting a particular Session. @@ -223,13 +358,30 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { ASSERT_EQ(1U, lsidsFound.size()); ASSERT_EQ(lsids[2], lsidsFound.front()); lsidsFound.clear(); + + // Scan over all Sessions, visiting a Session with child Sessions. + SessionKiller::Matcher matcherLSID1( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx, lsids[1])}); + catalog()->scanSessions(matcherLSID1, workerFn); + ASSERT_EQ(1U, lsidsFound.size()); + ASSERT_EQ(lsids[1], lsidsFound.front()); + // TODO (SERVER-58755): Make the SessionKiller::Matcher also return the child sessions. + // ASSERT_EQ(3U, lsidsFound.size()); + // ASSERT_EQ(lsids[1], lsidsFound[0]); + // ASSERT_EQ(lsids[2], lsidsFound[1]); + // ASSERT_EQ(lsids[3], lsidsFound[2]); + lsidsFound.clear(); } TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsMarkForReap) { - // Create three sessions in the catalog. - const std::vector lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(lsid1); + auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); + return {lsid0, lsid1, lsid2, lsid3}; + }(); unittest::Barrier sessionsCheckedOut(2); unittest::Barrier sessionsCheckedIn(2); @@ -244,7 +396,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsMarkForReap) { }); // After this wait, session 1 is checked-out and waiting on the barrier, because of which only - // sessions 0 and 2 will be reaped + // sessions 0, 2 and 3 will be reaped. sessionsCheckedOut.countDownAndWait(); SessionKiller::Matcher matcherAllSessions( @@ -267,177 +419,278 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsMarkForReap) { } TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) { - const auto lsid = makeLogicalSessionIdForTest(); + auto runTest = [&](const LogicalSessionId& lsid) { + // Create the session so there is something to kill + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + } - // Create the session so there is something to kill - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(opCtx.get()); - } + auto killToken = catalog()->killSession(lsid); - auto killToken = catalog()->killSession(lsid); + // Make sure that regular session check-out will fail because the session is marked as + // killed + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } - // Make sure that regular session check-out will fail because the session is marked as killed - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); - ASSERT_THROWS_CODE( - OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired); - } + // Schedule a separate "regular operation" thread, which will block on checking-out the + // session, which we will use to confirm that session kill completion actually unblocks + // check-out + auto future = stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(sideOpCtx.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); - // Schedule a separate "regular operation" thread, which will block on checking-out the session, - // which we will use to confirm that session kill completion actually unblocks check-out - auto future = stdx::async(stdx::launch::async, [this, lsid] { - ThreadClient tc(getServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(sideOpCtx.get()); - }); - ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + // Make sure that "for kill" session check-out succeeds + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } - // Make sure that "for kill" session check-out succeeds - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - } + // Make sure that session check-out after kill succeeds again + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + } - // Make sure that session check-out after kill succeeds again - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(opCtx.get()); - } + // Make sure the "regular operation" eventually is able to proceed and use the just killed + // session + future.get(); + }; - // Make sure the "regular operation" eventually is able to proceed and use the just killed - // session - future.get(); + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) { - const auto lsid = makeLogicalSessionIdForTest(); + auto runTest = [&](const LogicalSessionId& lsid) { + auto killToken = [this, &lsid] { + // Create the session so there is something to kill + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession operationContextSession(opCtx.get()); - auto killToken = [this, &lsid] { - // Create the session so there is something to kill - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession operationContextSession(opCtx.get()); + auto killToken = catalog()->killSession(lsid); - auto killToken = catalog()->killSession(lsid); + // Make sure the owning operation context is interrupted + ASSERT_THROWS_CODE( + opCtx->checkForInterrupt(), AssertionException, ErrorCodes::Interrupted); - // Make sure the owning operation context is interrupted - ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), AssertionException, ErrorCodes::Interrupted); + // Make sure that the checkOutForKill call will wait for the owning operation context to + // check the session back in + auto future = stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + OperationContextSession ocs(sideOpCtx.get()); + }); + + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired); - // Make sure that the checkOutForKill call will wait for the owning operation context to - // check the session back in + return killToken; + }(); + + // Make sure that regular session check-out will fail because the session is marked as + // killed + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } + + // Schedule a separate "regular operation" thread, which will block on checking-out the + // session, which we will use to confirm that session kill completion actually unblocks + // check-out auto future = stdx::async(stdx::launch::async, [this, lsid] { ThreadClient tc(getServiceContext()); auto sideOpCtx = Client::getCurrent()->makeOperationContext(); sideOpCtx->setLogicalSessionId(lsid); - sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); OperationContextSession ocs(sideOpCtx.get()); }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + // Make sure that "for kill" session check-out succeeds + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } - ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired); + // Make sure that session check-out after kill succeeds again + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + } - return killToken; - }(); + // Make sure the "regular operation" eventually is able to proceed and use the just killed + // session + future.get(); + }; - // Make sure that regular session check-out will fail because the session is marked as killed - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); - ASSERT_THROWS_CODE( - OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired); - } + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); +} - // Schedule a separate "regular operation" thread, which will block on checking-out the session, - // which we will use to confirm that session kill completion actually unblocks check-out - auto future = stdx::async(stdx::launch::async, [this, lsid] { - ThreadClient tc(getServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(sideOpCtx.get()); - }); - ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); +TEST_F(SessionCatalogTest, KillParentSessionWhenChildSessionIsCheckedOut) { + auto runTest = [&](const LogicalSessionId& parentLsid, const LogicalSessionId& childLsid) { + auto killToken = [this, &parentLsid, &childLsid] { + // Create the session so there is something to kill + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(childLsid); + OperationContextSession operationContextSession(opCtx.get()); - // Make sure that "for kill" session check-out succeeds - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - } + auto killToken = catalog()->killSession(parentLsid); - // Make sure that session check-out after kill succeeds again - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(opCtx.get()); - } + // Make sure the owning operation context is interrupted + ASSERT_THROWS_CODE( + opCtx->checkForInterrupt(), AssertionException, ErrorCodes::Interrupted); - // Make sure the "regular operation" eventually is able to proceed and use the just killed - // session - future.get(); + // Make sure that the checkOutForKill call will wait for the owning operation context to + // check the session back in + auto future = stdx::async(stdx::launch::async, [this, childLsid] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(childLsid); + sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + OperationContextSession ocs(sideOpCtx.get()); + }); + + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired); + + return killToken; + }(); + + // Make sure that regular session check-out will fail because the session is marked as + // killed + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(childLsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } + + // Schedule a separate "regular operation" thread, which will block on checking-out the + // session, which we will use to confirm that session kill completion actually unblocks + // check-out + auto future = stdx::async(stdx::launch::async, [this, childLsid] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(sideOpCtx.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + // Make sure that "for kill" session check-out succeeds + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } + + // Make sure that session check-out after kill succeeds again + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(opCtx.get()); + } + + // Make sure the "regular operation" eventually is able to proceed and use the just killed + // session + future.get(); + }; + + auto parentLsid = makeLogicalSessionIdForTest(); + runTest(parentLsid, makeLogicalSessionIdWithTxnNumberForTest(parentLsid)); + runTest(parentLsid, makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); } TEST_F(SessionCatalogTest, MarkSessionAsKilledCanBeCalledMoreThanOnce) { - const auto lsid = makeLogicalSessionIdForTest(); + auto runTest = [&](const LogicalSessionId& lsid) { + // Create the session so there is something to kill + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + } - // Create the session so there is something to kill - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(opCtx.get()); - } + auto killToken1 = catalog()->killSession(lsid); + auto killToken2 = catalog()->killSession(lsid); - auto killToken1 = catalog()->killSession(lsid); - auto killToken2 = catalog()->killSession(lsid); + // Make sure that regular session check-out will fail because there are two killers on the + // session + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } - // Make sure that regular session check-out will fail because there are two killers on the - // session - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); - ASSERT_THROWS_CODE( - OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired); - } + boost::optional killTokenWhileSessionIsCheckedOutForKill; - boost::optional killTokenWhileSessionIsCheckedOutForKill; + // Finish the first killer of the session + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken1)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - // Finish the first killer of the session - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken1)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + // Killing a session while checked out for kill should not affect the killers + killTokenWhileSessionIsCheckedOutForKill.emplace(catalog()->killSession(lsid)); + } - // Killing a session while checked out for kill should not affect the killers - killTokenWhileSessionIsCheckedOutForKill.emplace(catalog()->killSession(lsid)); - } + // Regular session check-out should still fail because there are now still two killers on + // the session + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken2)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } + { + auto opCtx = makeOperationContext(); + auto scopedSession = catalog()->checkOutSessionForKill( + opCtx.get(), std::move(*killTokenWhileSessionIsCheckedOutForKill)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } + }; - // Regular session check-out should still fail because there are now still two killers on the - // session - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); - ASSERT_THROWS_CODE( - OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired); - } - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken2)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - } - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill( - opCtx.get(), std::move(*killTokenWhileSessionIsCheckedOutForKill)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - } + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTest, MarkSessionsAsKilledWhenSessionDoesNotExist) { @@ -447,41 +700,56 @@ TEST_F(SessionCatalogTest, MarkSessionsAsKilledWhenSessionDoesNotExist) { } TEST_F(SessionCatalogTestWithDefaultOpCtx, SessionDiscarOperationContextAfterCheckIn) { - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + auto runTest = [&](const LogicalSessionId& lsid) { + _opCtx->setLogicalSessionId(lsid); - { - OperationContextSession ocs(_opCtx); - ASSERT(OperationContextSession::get(_opCtx)); + { + OperationContextSession ocs(_opCtx); + ASSERT(OperationContextSession::get(_opCtx)); + + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); + } - OperationContextSession::checkIn(_opCtx); ASSERT(!OperationContextSession::get(_opCtx)); - } + }; - ASSERT(!OperationContextSession::get(_opCtx)); + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTestWithDefaultOpCtx, SessionDiscarOperationContextAfterCheckInCheckOut) { - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + auto runTest = [&](const LogicalSessionId& lsid) { + _opCtx->setLogicalSessionId(lsid); - { - OperationContextSession ocs(_opCtx); - ASSERT(OperationContextSession::get(_opCtx)); + { + OperationContextSession ocs(_opCtx); + ASSERT(OperationContextSession::get(_opCtx)); - OperationContextSession::checkIn(_opCtx); - ASSERT(!OperationContextSession::get(_opCtx)); + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); - OperationContextSession::checkOut(_opCtx); - ASSERT(OperationContextSession::get(_opCtx)); - } + OperationContextSession::checkOut(_opCtx); + ASSERT(OperationContextSession::get(_opCtx)); + } - ASSERT(!OperationContextSession::get(_opCtx)); + ASSERT(!OperationContextSession::get(_opCtx)); + }; + + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) { - // Create three sessions - const std::vector lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdWithTxnNumberForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(); + return {lsid0, lsid1, lsid2}; + }(); std::vector> futures; unittest::Barrier firstUseOfTheSessionReachedBarrier(lsids.size() + 1); @@ -556,62 +824,73 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) { // Even if the implementaion has a bug, the test may not always fail depending on thread // scheduling, however, this test case still gives us a good coverage. TEST_F(SessionCatalogTestWithDefaultOpCtx, ConcurrentCheckOutAndKill) { - auto lsid = makeLogicalSessionIdForTest(); - _opCtx->setLogicalSessionId(lsid); - - stdx::future normalCheckOutFinish, killCheckOutFinish; + auto runTest = [&](const LogicalSessionId& lsid) { + auto client = getServiceContext()->makeClient("ConcurrentCheckOutAndKill"); + AlternativeClientRegion acr(client); + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(lsid); - // This variable is protected by the session check-out. - std::string lastSessionCheckOut = "first session"; - { - // Check out the session to block both normal check-out and checkOutForKill. - OperationContextSession firstCheckOut(_opCtx); + stdx::future normalCheckOutFinish, killCheckOutFinish; - // Normal check out should start after kill. - normalCheckOutFinish = stdx::async(stdx::launch::async, [&] { - ThreadClient tc(getServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); - OperationContextSession normalCheckOut(sideOpCtx.get()); - ASSERT_EQ("session kill", lastSessionCheckOut); - lastSessionCheckOut = "session checkout"; - }); + // This variable is protected by the session check-out. + std::string lastSessionCheckOut = "first session"; + { + // Check out the session to block both normal check-out and checkOutForKill. + OperationContextSession firstCheckOut(opCtx.get()); - // Kill will short-cut the queue and be the next one to check out. - killCheckOutFinish = stdx::async(stdx::launch::async, [&] { - ThreadClient tc(getServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); + // Normal check out should start after kill. + normalCheckOutFinish = stdx::async(stdx::launch::async, [&] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + OperationContextSession normalCheckOut(sideOpCtx.get()); + ASSERT_EQ("session kill", lastSessionCheckOut); + lastSessionCheckOut = "session checkout"; + }); - // Kill the session - std::vector killTokens; - catalog()->scanSessions( - SessionKiller::Matcher( - KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(sideOpCtx.get())}), - [&killTokens](const ObservableSession& session) { - killTokens.emplace_back(session.kill(ErrorCodes::InternalError)); - }); - ASSERT_EQ(1U, killTokens.size()); - auto checkOutSessionForKill( - catalog()->checkOutSessionForKill(sideOpCtx.get(), std::move(killTokens[0]))); - ASSERT_EQ("first session", lastSessionCheckOut); - lastSessionCheckOut = "session kill"; - }); + // Kill will short-cut the queue and be the next one to check out. + killCheckOutFinish = stdx::async(stdx::launch::async, [&] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + + // Kill the session + std::vector killTokens; + catalog()->scanSessions( + SessionKiller::Matcher( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx, lsid)}), + [&killTokens](const ObservableSession& session) { + killTokens.emplace_back(session.kill(ErrorCodes::InternalError)); + }); + + ASSERT_EQ(1U, killTokens.size()); + auto checkOutSessionForKill( + catalog()->checkOutSessionForKill(sideOpCtx.get(), std::move(killTokens[0]))); + + ASSERT_EQ("first session", lastSessionCheckOut); + lastSessionCheckOut = "session kill"; + }); - // The main thread won't check in the session until it's killed. - { - auto m = MONGO_MAKE_LATCH(); - stdx::condition_variable cond; - stdx::unique_lock lock(m); - ASSERT_THROWS_CODE( - _opCtx->waitForConditionOrInterrupt(cond, lock, [] { return false; }), - DBException, - ErrorCodes::InternalError); + // The main thread won't check in the session until it's killed. + { + auto m = MONGO_MAKE_LATCH(); + stdx::condition_variable cond; + stdx::unique_lock lock(m); + ASSERT_THROWS_CODE( + opCtx->waitForConditionOrInterrupt(cond, lock, [] { return false; }), + DBException, + ErrorCodes::InternalError); + } } - } - normalCheckOutFinish.get(); - killCheckOutFinish.get(); - ASSERT_EQ("session checkout", lastSessionCheckOut); + normalCheckOutFinish.get(); + killCheckOutFinish.get(); + + ASSERT_EQ("session checkout", lastSessionCheckOut); + }; + + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } } // namespace -- cgit v1.2.1