diff options
Diffstat (limited to 'src/mongo')
4 files changed, 96 insertions, 52 deletions
diff --git a/src/mongo/db/session/session_catalog_mongod.cpp b/src/mongo/db/session/session_catalog_mongod.cpp index 94b5584a146..ff31b202e3f 100644 --- a/src/mongo/db/session/session_catalog_mongod.cpp +++ b/src/mongo/db/session/session_catalog_mongod.cpp @@ -49,7 +49,6 @@ #include "mongo/db/session/sessions_collection.h" #include "mongo/db/transaction/transaction_participant.h" #include "mongo/logv2/log.h" -#include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/transaction_router.h" #include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/fail_point.h" @@ -140,7 +139,10 @@ void disallowDirectWritesUnderSession(OperationContext* opCtx) { * not removed because they were in use. */ LogicalSessionIdSet removeExpiredTransactionSessionsNotInUseFromMemory( - OperationContext* opCtx, SessionsCollection& sessionsCollection, Date_t possiblyExpired) { + OperationContext* opCtx, + MongoDSessionCatalogTransactionInterface* ti, + SessionsCollection& sessionsCollection, + Date_t possiblyExpired) { const auto catalog = SessionCatalog::get(opCtx); // Find the possibly expired logical session ids in the in-memory catalog. @@ -170,55 +172,8 @@ LogicalSessionIdSet removeExpiredTransactionSessionsNotInUseFromMemory( TxnNumber parentSessionActiveTxnNumber; const auto transactionSessionIdsNotReaped = catalog->scanSessionsForReap( expiredLogicalSessionId, - [&](ObservableSession& parentSession) { - const auto transactionSessionId = parentSession.getSessionId(); - const auto txnParticipant = TransactionParticipant::get(parentSession); - const auto txnRouter = TransactionRouter::get(parentSession); - - parentSessionActiveTxnNumber = - txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber(); - if (txnParticipant.canBeReaped() && txnRouter.canBeReaped()) { - LOGV2_DEBUG(6753702, - 5, - "Marking parent transaction session for reap", - "lsid"_attr = transactionSessionId); - // This is an external session so it can be reaped if and only if all of its - // internal sessions can be reaped. - parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); - } - }, - [&](ObservableSession& childSession) { - const auto transactionSessionId = childSession.getSessionId(); - const auto txnParticipant = TransactionParticipant::get(childSession); - const auto txnRouter = TransactionRouter::get(childSession); - - if (txnParticipant.canBeReaped() && txnRouter.canBeReaped()) { - if (isInternalSessionForNonRetryableWrite(transactionSessionId)) { - LOGV2_DEBUG(6753703, - 5, - "Marking child transaction session for reap", - "lsid"_attr = transactionSessionId); - // This is an internal session for a non-retryable write so it can be reaped - // independently of the external session that write ran in. - childSession.markForReap(ObservableSession::ReapMode::kExclusive); - } else if (isInternalSessionForRetryableWrite(transactionSessionId)) { - LOGV2_DEBUG(6753704, - 5, - "Marking child transaction session for reap", - "lsid"_attr = transactionSessionId); - // This is an internal session for a retryable write so it must be reaped - // atomically with the external session and internal sessions for that - // retryable write, unless the write is no longer active (i.e. there is - // already a retryable write or transaction with a higher txnNumber). - childSession.markForReap(*transactionSessionId.getTxnNumber() < - parentSessionActiveTxnNumber - ? ObservableSession::ReapMode::kExclusive - : ObservableSession::ReapMode::kNonExclusive); - } else { - MONGO_UNREACHABLE; - } - } - }); + ti->makeParentSessionWorkerFnForReap(&parentSessionActiveTxnNumber), + ti->makeChildSessionWorkerFnForReap(parentSessionActiveTxnNumber)); expiredTransactionSessionIdsStillInUse.insert(transactionSessionIdsNotReaped.begin(), transactionSessionIdsNotReaped.end()); } @@ -699,7 +654,7 @@ int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx, Date_t possiblyExpired) { const auto expiredTransactionSessionIdsStillInUse = removeExpiredTransactionSessionsNotInUseFromMemory( - opCtx, sessionsCollection, possiblyExpired); + opCtx, _ti.get(), sessionsCollection, possiblyExpired); // The "unsafe" check for primary below is a best-effort attempt to ensure that the on-disk // state reaping code doesn't run if the node is secondary and cause log spam. It is a work diff --git a/src/mongo/db/session/session_catalog_mongod_transaction_interface.h b/src/mongo/db/session/session_catalog_mongod_transaction_interface.h index 2cde4d52750..fd800245c7e 100644 --- a/src/mongo/db/session/session_catalog_mongod_transaction_interface.h +++ b/src/mongo/db/session/session_catalog_mongod_transaction_interface.h @@ -30,6 +30,7 @@ #pragma once #include "mongo/db/operation_context.h" +#include "mongo/db/session/session_catalog.h" // for ScanSessionsCallbackFn #include "mongo/db/transaction/transaction_participant.h" // for SessionToKill namespace mongo { @@ -41,6 +42,8 @@ namespace mongo { */ class MongoDSessionCatalogTransactionInterface { public: + using ScanSessionsCallbackFn = SessionCatalog::ScanSessionsCallbackFn; + virtual ~MongoDSessionCatalogTransactionInterface() = default; /** @@ -48,6 +51,23 @@ public: * externally, such as through a direct write to the transactions table. */ virtual void invalidateSessionToKill(OperationContext* opCtx, const SessionToKill& session) = 0; + + /** + * Returns a 'parentSessionWorkerFn' that can be passed to + * SessionCatalog::scanSessionsForReap(). + * + * Accepts an output parameter for the parent session's TxnNumber. + */ + virtual ScanSessionsCallbackFn makeParentSessionWorkerFnForReap( + TxnNumber* parentSessionActiveTxnNumber) = 0; + + /** + * Returns a 'childSessionWorkerFn' that can be passed to SessionCatalog::scanSessionsForReap(). + * + * Accepts a reference to the parent session's TxnNumber. + */ + virtual ScanSessionsCallbackFn makeChildSessionWorkerFnForReap( + const TxnNumber& parentSessionActiveTxnNumber) = 0; }; } // namespace mongo diff --git a/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.cpp b/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.cpp index 3c570f34a48..0c944cfffb4 100644 --- a/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.cpp +++ b/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.cpp @@ -30,6 +30,10 @@ #include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h" #include "mongo/db/transaction/transaction_participant.h" +#include "mongo/logv2/log.h" +#include "mongo/s/transaction_router.h" + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTransaction namespace mongo { @@ -39,4 +43,63 @@ void MongoDSessionCatalogTransactionInterfaceImpl::invalidateSessionToKill( participant.invalidate(opCtx); } +MongoDSessionCatalogTransactionInterface::ScanSessionsCallbackFn +MongoDSessionCatalogTransactionInterfaceImpl::makeParentSessionWorkerFnForReap( + TxnNumber* parentSessionActiveTxnNumber) { + return [parentSessionActiveTxnNumber](ObservableSession& parentSession) { + const auto transactionSessionId = parentSession.getSessionId(); + const auto txnParticipant = TransactionParticipant::get(parentSession); + const auto txnRouter = TransactionRouter::get(parentSession); + + *parentSessionActiveTxnNumber = + txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber(); + if (txnParticipant.canBeReaped() && txnRouter.canBeReaped()) { + LOGV2_DEBUG(6753702, + 5, + "Marking parent transaction session for reap", + "lsid"_attr = transactionSessionId); + // This is an external session so it can be reaped if and only if all of its + // internal sessions can be reaped. + parentSession.markForReap(ObservableSession::ReapMode::kNonExclusive); + } + }; +} + +MongoDSessionCatalogTransactionInterface::ScanSessionsCallbackFn +MongoDSessionCatalogTransactionInterfaceImpl::makeChildSessionWorkerFnForReap( + const TxnNumber& parentSessionActiveTxnNumber) { + return [&parentSessionActiveTxnNumber](ObservableSession& childSession) { + const auto transactionSessionId = childSession.getSessionId(); + const auto txnParticipant = TransactionParticipant::get(childSession); + const auto txnRouter = TransactionRouter::get(childSession); + + if (txnParticipant.canBeReaped() && txnRouter.canBeReaped()) { + if (isInternalSessionForNonRetryableWrite(transactionSessionId)) { + LOGV2_DEBUG(6753703, + 5, + "Marking child transaction session for reap", + "lsid"_attr = transactionSessionId); + // This is an internal session for a non-retryable write so it can be reaped + // independently of the external session that write ran in. + childSession.markForReap(ObservableSession::ReapMode::kExclusive); + } else if (isInternalSessionForRetryableWrite(transactionSessionId)) { + LOGV2_DEBUG(6753704, + 5, + "Marking child transaction session for reap", + "lsid"_attr = transactionSessionId); + // This is an internal session for a retryable write so it must be reaped + // atomically with the external session and internal sessions for that + // retryable write, unless the write is no longer active (i.e. there is + // already a retryable write or transaction with a higher txnNumber). + childSession.markForReap(*transactionSessionId.getTxnNumber() < + parentSessionActiveTxnNumber + ? ObservableSession::ReapMode::kExclusive + : ObservableSession::ReapMode::kNonExclusive); + } else { + MONGO_UNREACHABLE; + } + } + }; +} + } // namespace mongo diff --git a/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h b/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h index 6d7751cb885..8fdd82ab8dc 100644 --- a/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h +++ b/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h @@ -48,6 +48,12 @@ public: virtual ~MongoDSessionCatalogTransactionInterfaceImpl() = default; void invalidateSessionToKill(OperationContext* opCtx, const SessionToKill& session) override; + + ScanSessionsCallbackFn makeParentSessionWorkerFnForReap( + TxnNumber* parentSessionActiveTxnNumber) override; + + ScanSessionsCallbackFn makeChildSessionWorkerFnForReap( + const TxnNumber& parentSessionActiveTxnNumber) override; }; } // namespace mongo |