summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2023-05-09 20:27:32 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2023-05-10 01:29:57 +0000
commitc5bde8dfab169ae2e020eef3e3c1bab8c5f6e841 (patch)
tree6643c1a52b5cd35131d51b7a7620975b4f3c29bf /src/mongo
parent5e2159ea02bdfc56de4d5998f784f70e83777e0a (diff)
downloadmongo-c5bde8dfab169ae2e020eef3e3c1bab8c5f6e841.tar.gz
SERVER-76368 Allow TransactionRouter on a mongod to early reap retryable sessions
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript3
-rw-r--r--src/mongo/db/mongod_main.cpp3
-rw-r--r--src/mongo/db/repl/oplog_applier_impl_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp4
-rw-r--r--src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp4
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp4
-rw-r--r--src/mongo/db/session/SConscript3
-rw-r--r--src/mongo/db/session/internal_transactions_reap_service.cpp (renamed from src/mongo/db/transaction/internal_transactions_reap_service.cpp)4
-rw-r--r--src/mongo/db/session/internal_transactions_reap_service.h (renamed from src/mongo/db/transaction/internal_transactions_reap_service.h)0
-rw-r--r--src/mongo/db/session/internal_transactions_reap_service.idl (renamed from src/mongo/db/transaction/internal_transactions_reap_service.idl)0
-rw-r--r--src/mongo/db/session/internal_transactions_reap_service_test.cpp (renamed from src/mongo/db/transaction/internal_transactions_reap_service_test.cpp)4
-rw-r--r--src/mongo/db/session/session_catalog.cpp62
-rw-r--r--src/mongo/db/session/session_catalog.h60
-rw-r--r--src/mongo/db/session/session_catalog_mongod.cpp16
-rw-r--r--src/mongo/db/session/session_catalog_mongod.h7
-rw-r--r--src/mongo/db/session/session_catalog_mongod_transaction_interface.h7
-rw-r--r--src/mongo/db/transaction/SConscript4
-rw-r--r--src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.cpp25
-rw-r--r--src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h3
-rw-r--r--src/mongo/db/transaction/transaction_participant.cpp4
-rw-r--r--src/mongo/db/transaction/transaction_participant_test.cpp120
-rw-r--r--src/mongo/s/transaction_router.cpp15
-rw-r--r--src/mongo/s/transaction_router_test.cpp39
23 files changed, 284 insertions, 111 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 2d5d9447d8e..eca31451859 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -2624,6 +2624,7 @@ if wiredtiger:
'record_id_test.cpp',
'server_options_test.cpp',
'session/internal_session_pool_test.cpp',
+ 'session/internal_transactions_reap_service_test.cpp',
'session/logical_session_cache_test.cpp',
'session/logical_session_id_test.cpp',
'session/session_catalog_mongod_test.cpp',
@@ -2666,8 +2667,10 @@ if wiredtiger:
'$BUILD_DIR/mongo/db/query/common_query_enums_and_helpers',
'$BUILD_DIR/mongo/db/query/op_metrics',
'$BUILD_DIR/mongo/db/query/query_test_service_context',
+ '$BUILD_DIR/mongo/db/repl/image_collection_entry',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/repl/repl_server_parameters',
+ '$BUILD_DIR/mongo/db/repl/replica_set_aware_service',
'$BUILD_DIR/mongo/db/repl/replmocks',
'$BUILD_DIR/mongo/db/repl/storage_interface_impl',
'$BUILD_DIR/mongo/db/repl/tenant_migration_access_blocker',
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index 979085513fe..636852ed088 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -188,7 +188,6 @@
#include "mongo/db/storage/storage_parameters_gen.h"
#include "mongo/db/system_index.h"
#include "mongo/db/timeseries/timeseries_op_observer.h"
-#include "mongo/db/transaction/internal_transactions_reap_service.h"
#include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h"
#include "mongo/db/transaction/transaction_participant.h"
#include "mongo/db/ttl.h"
@@ -1748,8 +1747,6 @@ int mongod_main(int argc, char* argv[]) {
setUpReplication(service);
setUpObservers(service);
service->setServiceEntryPoint(std::make_unique<ServiceEntryPointMongod>(service));
- SessionCatalog::get(service)->setOnEagerlyReapedSessionsFn(
- InternalTransactionsReapService::onEagerlyReapedSessions);
ErrorExtraInfo::invariantHaveAllParsers();
diff --git a/src/mongo/db/repl/oplog_applier_impl_test.cpp b/src/mongo/db/repl/oplog_applier_impl_test.cpp
index 735a46e23e1..d23e244e40f 100644
--- a/src/mongo/db/repl/oplog_applier_impl_test.cpp
+++ b/src/mongo/db/repl/oplog_applier_impl_test.cpp
@@ -3699,10 +3699,6 @@ public:
// secondary index creation does not. We use an UnreplicatedWritesBlock to avoid
// timestamping any of the catalog setup.
repl::UnreplicatedWritesBlock noRep(_opCtx.get());
- MongoDSessionCatalog::set(
- _opCtx->getServiceContext(),
- std::make_unique<MongoDSessionCatalog>(
- std::make_unique<MongoDSessionCatalogTransactionInterfaceImpl>()));
auto mongoDSessionCatalog = MongoDSessionCatalog::get(_opCtx.get());
mongoDSessionCatalog->onStepUp(_opCtx.get());
diff --git a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
index 16cc18c2fe6..096a9292ec0 100644
--- a/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_oplog_fetcher_test.cpp
@@ -115,10 +115,6 @@ public:
// onStepUp() relies on the storage interface to create the config.transactions table.
repl::StorageInterface::set(getServiceContext(),
std::make_unique<repl::StorageInterfaceImpl>());
- MongoDSessionCatalog::set(
- getServiceContext(),
- std::make_unique<MongoDSessionCatalog>(
- std::make_unique<MongoDSessionCatalogTransactionInterfaceImpl>()));
auto mongoDSessionCatalog = MongoDSessionCatalog::get(operationContext());
mongoDSessionCatalog->onStepUp(operationContext());
LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
diff --git a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
index 418657ec6b3..0f1f6a72b98 100644
--- a/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
+++ b/src/mongo/db/s/resharding/resharding_txn_cloner_test.cpp
@@ -108,10 +108,6 @@ class ReshardingTxnClonerTest : public ShardServerTestFixture {
// onStepUp() relies on the storage interface to create the config.transactions table.
repl::StorageInterface::set(getServiceContext(),
std::make_unique<repl::StorageInterfaceImpl>());
- MongoDSessionCatalog::set(
- getServiceContext(),
- std::make_unique<MongoDSessionCatalog>(
- std::make_unique<MongoDSessionCatalogTransactionInterfaceImpl>()));
auto mongoDSessionCatalog = MongoDSessionCatalog::get(operationContext());
mongoDSessionCatalog->onStepUp(operationContext());
LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 6e54739ffe8..09bbd62441b 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -139,10 +139,6 @@ public:
// onStepUp() relies on the storage interface to create the config.transactions table.
repl::StorageInterface::set(getServiceContext(),
std::make_unique<repl::StorageInterfaceImpl>());
- MongoDSessionCatalog::set(
- getServiceContext(),
- std::make_unique<MongoDSessionCatalog>(
- std::make_unique<MongoDSessionCatalogTransactionInterfaceImpl>()));
auto mongoDSessionCatalog = MongoDSessionCatalog::get(operationContext());
mongoDSessionCatalog->onStepUp(operationContext());
LogicalSessionCache::set(getServiceContext(), std::make_unique<LogicalSessionCacheNoop>());
diff --git a/src/mongo/db/session/SConscript b/src/mongo/db/session/SConscript
index 32ea42a2691..8e7d243c93b 100644
--- a/src/mongo/db/session/SConscript
+++ b/src/mongo/db/session/SConscript
@@ -175,6 +175,8 @@ env.Library(
env.Library(
target='session_catalog_mongod',
source=[
+ 'internal_transactions_reap_service.cpp',
+ 'internal_transactions_reap_service.idl',
'session_catalog_mongod.cpp',
'session_txn_record.idl',
],
@@ -186,6 +188,7 @@ env.Library(
'$BUILD_DIR/mongo/db/internal_transactions_feature_flag',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/db/repl/repl_server_parameters',
+ '$BUILD_DIR/mongo/db/repl/replica_set_aware_service',
'$BUILD_DIR/mongo/db/repl/storage_interface',
'$BUILD_DIR/mongo/db/server_base',
'$BUILD_DIR/mongo/db/service_context',
diff --git a/src/mongo/db/transaction/internal_transactions_reap_service.cpp b/src/mongo/db/session/internal_transactions_reap_service.cpp
index b547a54b6a7..c06cacd121e 100644
--- a/src/mongo/db/transaction/internal_transactions_reap_service.cpp
+++ b/src/mongo/db/session/internal_transactions_reap_service.cpp
@@ -27,10 +27,10 @@
* it in the license file.
*/
-#include "mongo/db/transaction/internal_transactions_reap_service.h"
+#include "mongo/db/session/internal_transactions_reap_service.h"
+#include "mongo/db/session/internal_transactions_reap_service_gen.h"
#include "mongo/db/session/session_catalog_mongod.h"
-#include "mongo/db/transaction/internal_transactions_reap_service_gen.h"
#include "mongo/logv2/log.h"
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTransaction
diff --git a/src/mongo/db/transaction/internal_transactions_reap_service.h b/src/mongo/db/session/internal_transactions_reap_service.h
index 09da172f687..09da172f687 100644
--- a/src/mongo/db/transaction/internal_transactions_reap_service.h
+++ b/src/mongo/db/session/internal_transactions_reap_service.h
diff --git a/src/mongo/db/transaction/internal_transactions_reap_service.idl b/src/mongo/db/session/internal_transactions_reap_service.idl
index 86c8997f3aa..86c8997f3aa 100644
--- a/src/mongo/db/transaction/internal_transactions_reap_service.idl
+++ b/src/mongo/db/session/internal_transactions_reap_service.idl
diff --git a/src/mongo/db/transaction/internal_transactions_reap_service_test.cpp b/src/mongo/db/session/internal_transactions_reap_service_test.cpp
index 46a1c1fbc26..83c67d697c6 100644
--- a/src/mongo/db/transaction/internal_transactions_reap_service_test.cpp
+++ b/src/mongo/db/session/internal_transactions_reap_service_test.cpp
@@ -33,10 +33,10 @@
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session/internal_transactions_reap_service.h"
+#include "mongo/db/session/internal_transactions_reap_service_gen.h"
#include "mongo/db/session/session_catalog_mongod.h"
#include "mongo/db/session/session_txn_record_gen.h"
-#include "mongo/db/transaction/internal_transactions_reap_service.h"
-#include "mongo/db/transaction/internal_transactions_reap_service_gen.h"
#include "mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h"
namespace mongo {
diff --git a/src/mongo/db/session/session_catalog.cpp b/src/mongo/db/session/session_catalog.cpp
index 0537484c4f5..288e473ab54 100644
--- a/src/mongo/db/session/session_catalog.cpp
+++ b/src/mongo/db/session/session_catalog.cpp
@@ -52,6 +52,16 @@ const auto operationSessionDecoration =
MONGO_FAIL_POINT_DEFINE(hangAfterIncrementingNumWaitingToCheckOut);
+std::string provenanceToString(SessionCatalog::Provenance provenance) {
+ switch (provenance) {
+ case SessionCatalog::Provenance::kRouter:
+ return "router";
+ case SessionCatalog::Provenance::kParticipant:
+ return "participant";
+ }
+ MONGO_UNREACHABLE;
+}
+
} // namespace
SessionCatalog::~SessionCatalog() {
@@ -314,10 +324,11 @@ SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeIn
return sri;
}
-void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
- Session* session,
- boost::optional<KillToken> killToken,
- boost::optional<TxnNumber> clientTxnNumberStarted) {
+void SessionCatalog::_releaseSession(
+ SessionRuntimeInfo* sri,
+ Session* session,
+ boost::optional<KillToken> killToken,
+ boost::optional<TxnNumberAndProvenance> clientTxnNumberStarted) {
stdx::unique_lock<Latch> ul(_mutex);
// Make sure we have exactly the same session on the map and that it is still associated with an
@@ -340,16 +351,18 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
std::vector<LogicalSessionId> eagerlyReapedSessions;
if (clientTxnNumberStarted.has_value()) {
+ auto [txnNumber, provenance] = *clientTxnNumberStarted;
+
// Since the given txnNumber successfully started, we know any child sessions with older
// txnNumbers can be discarded. This needed to wait until a transaction started because that
// can fail, e.g. if the active transaction is prepared.
+ auto workerFn = _makeSessionWorkerFnForEagerReap(service, txnNumber, provenance);
auto numReaped = stdx::erase_if(sri->childSessions, [&](auto&& it) {
ObservableSession osession(ul, sri, &it.second);
- if (it.first.getTxnNumber() && *it.first.getTxnNumber() < *clientTxnNumberStarted) {
- osession.markForReap(ObservableSession::ReapMode::kExclusive);
- }
+ workerFn(osession);
- bool willReap = osession._shouldBeReaped();
+ bool willReap = osession._shouldBeReaped() &&
+ (osession._reapMode == ObservableSession::ReapMode::kExclusive);
if (willReap) {
eagerlyReapedSessions.push_back(std::move(it.first));
}
@@ -360,9 +373,10 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
4,
"Erased child sessions",
"releasedLsid"_attr = session->getSessionId(),
- "clientTxnNumber"_attr = *clientTxnNumberStarted,
+ "clientTxnNumber"_attr = txnNumber,
"childSessionsRemaining"_attr = sri->childSessions.size(),
- "numReaped"_attr = numReaped);
+ "numReaped"_attr = numReaped,
+ "provenance"_attr = provenanceToString(provenance));
}
invariant(ul);
@@ -373,6 +387,19 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
}
}
+SessionCatalog::ScanSessionsCallbackFn SessionCatalog::_defaultMakeSessionWorkerFnForEagerReap(
+ ServiceContext* service, TxnNumber clientTxnNumberStarted, Provenance provenance) {
+ return [clientTxnNumberStarted](ObservableSession& osession) {
+ // If a higher txnNumber has been seen for a client and started a transaction, assume any
+ // child sessions for lower transactions have been superseded and can be reaped.
+ const auto& transactionSessionId = osession.getSessionId();
+ if (transactionSessionId.getTxnNumber() &&
+ *transactionSessionId.getTxnNumber() < clientTxnNumberStarted) {
+ osession.markForReap(ObservableSession::ReapMode::kExclusive);
+ }
+ };
+}
+
Session* SessionCatalog::SessionRuntimeInfo::getSession(WithLock, const LogicalSessionId& lsid) {
if (isParentSessionId(lsid)) {
// We should have already compared the parent lsid when we found this SRI.
@@ -520,9 +547,10 @@ void OperationContextSession::checkOut(OperationContext* opCtx) {
checkedOutSession.emplace(std::move(scopedCheckedOutSession));
}
-void OperationContextSession::observeNewTxnNumberStarted(OperationContext* opCtx,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber) {
+void OperationContextSession::observeNewTxnNumberStarted(
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ SessionCatalog::TxnNumberAndProvenance txnNumberAndProvenance) {
auto& checkedOutSession = operationSessionDecoration(opCtx);
invariant(checkedOutSession);
@@ -530,7 +558,8 @@ void OperationContextSession::observeNewTxnNumberStarted(OperationContext* opCtx
4,
"Observing new retryable write number started on session",
"lsid"_attr = lsid,
- "txnNumber"_attr = txnNumber);
+ "txnNumber"_attr = txnNumberAndProvenance.first,
+ "provenance"_attr = txnNumberAndProvenance.second);
const auto& checkedOutLsid = (*checkedOutSession)->getSessionId();
if (isParentSessionId(lsid)) {
@@ -540,7 +569,7 @@ void OperationContextSession::observeNewTxnNumberStarted(OperationContext* opCtx
// parent. This is safe because both share the same SessionRuntimeInfo.
dassert(lsid == checkedOutLsid || lsid == *getParentSessionId(checkedOutLsid));
- checkedOutSession->observeNewClientTxnNumberStarted(txnNumber);
+ checkedOutSession->observeNewClientTxnNumberStarted(txnNumberAndProvenance);
} else if (isInternalSessionForRetryableWrite(lsid)) {
// Observing a new internal transaction on a retryable session.
@@ -548,7 +577,8 @@ void OperationContextSession::observeNewTxnNumberStarted(OperationContext* opCtx
// directly.
dassert(lsid == checkedOutLsid);
- checkedOutSession->observeNewClientTxnNumberStarted(*lsid.getTxnNumber());
+ checkedOutSession->observeNewClientTxnNumberStarted(
+ {*lsid.getTxnNumber(), txnNumberAndProvenance.second});
}
}
diff --git a/src/mongo/db/session/session_catalog.h b/src/mongo/db/session/session_catalog.h
index 2f26798ee69..e469bb0f000 100644
--- a/src/mongo/db/session/session_catalog.h
+++ b/src/mongo/db/session/session_catalog.h
@@ -60,8 +60,20 @@ class SessionCatalog {
friend class OperationContextSession;
public:
+ /**
+ * Represents which role the SessionCatalog was accessed in. The participant role for actions
+ * from a data bearing node (e.g. mongod servicing a local command) and router for a routing
+ * node (e.g. a mongos command, or mongod running a mongos command).
+ */
+ enum class Provenance { kParticipant, kRouter };
+
+ using TxnNumberAndProvenance = std::pair<TxnNumber, Provenance>;
+
+ using ScanSessionsCallbackFn = std::function<void(ObservableSession&)>;
using OnEagerlyReapedSessionsFn =
unique_function<void(ServiceContext*, std::vector<LogicalSessionId>)>;
+ using MakeSessionWorkerFnForEagerReap =
+ unique_function<ScanSessionsCallbackFn(ServiceContext*, TxnNumber, Provenance)>;
class ScopedCheckedOutSession;
class SessionToKill;
@@ -95,8 +107,6 @@ public:
*/
SessionToKill checkOutSessionForKill(OperationContext* opCtx, KillToken killToken);
- using ScanSessionsCallbackFn = std::function<void(ObservableSession&)>;
-
/**
* Iterates through the SessionCatalog under the SessionCatalog mutex and applies 'workerFn' to
* each Session which matches the specified 'lsid' or 'matcher'. Does not support reaping.
@@ -142,12 +152,15 @@ public:
size_t size() const;
/**
- * Registers a callback to run when sessions are "eagerly" reaped from the catalog, ie without
- * waiting for a logical session cache refresh.
+ * Registers two callbacks: one to run when sessions are "eagerly" reaped from the catalog, ie
+ * without waiting for a logical session cache refresh, and another to override the logic that
+ * determines when to eagerly reap a session.
*/
- void setOnEagerlyReapedSessionsFn(OnEagerlyReapedSessionsFn fn) {
+ void setEagerReapSessionsFns(OnEagerlyReapedSessionsFn onEagerlyReapedSessionsFn,
+ MakeSessionWorkerFnForEagerReap makeWorkerFnForEagerReap) {
invariant(!_onEagerlyReapedSessionsFn);
- _onEagerlyReapedSessionsFn = std::move(fn);
+ _onEagerlyReapedSessionsFn = std::move(onEagerlyReapedSessionsFn);
+ _makeSessionWorkerFnForEagerReap = std::move(makeWorkerFnForEagerReap);
}
private:
@@ -189,6 +202,12 @@ private:
using SessionRuntimeInfoMap = LogicalSessionIdMap<std::unique_ptr<SessionRuntimeInfo>>;
/**
+ * Returns a callback with the default logic used to decide if a session may be reaped early.
+ */
+ static ScanSessionsCallbackFn _defaultMakeSessionWorkerFnForEagerReap(
+ ServiceContext* service, TxnNumber clientTxnNumberStarted, Provenance provenance);
+
+ /**
* Blocking method, which checks-out the session with the given 'lsid'. Called inside
* '_checkOutSession' and 'checkOutSessionForKill'.
*/
@@ -220,13 +239,19 @@ private:
void _releaseSession(SessionRuntimeInfo* sri,
Session* session,
boost::optional<KillToken> killToken,
- boost::optional<TxnNumber> clientTxnNumberStarted);
+ boost::optional<TxnNumberAndProvenance> clientTxnNumberStarted);
// Called when sessions are reaped from memory "eagerly" ie directly by the SessionCatalog
// without waiting for a logical session cache refresh. Note this is set at process startup
// before multi-threading is enabled, so no synchronization is necessary.
boost::optional<OnEagerlyReapedSessionsFn> _onEagerlyReapedSessionsFn;
+ // Returns a callback used to decide if a session may be "eagerly" reaped from the session
+ // catalog without waiting for typical logical session expiration. May be overwritten, but only
+ // at process startup before multi-threading is enabled, so no synchronization is necessary.
+ MakeSessionWorkerFnForEagerReap _makeSessionWorkerFnForEagerReap =
+ _defaultMakeSessionWorkerFnForEagerReap;
+
// Protects the state below
mutable Mutex _mutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "SessionCatalog::_mutex");
@@ -253,7 +278,7 @@ public:
ScopedCheckedOutSession(ScopedCheckedOutSession&& other)
: _catalog(other._catalog),
- _clientTxnNumberStarted(other._clientTxnNumberStarted),
+ _clientTxnNumberStartedAndProvenance(other._clientTxnNumberStartedAndProvenance),
_sri(other._sri),
_session(other._session),
_killToken(std::move(other._killToken)) {
@@ -267,7 +292,7 @@ public:
~ScopedCheckedOutSession() {
if (_sri) {
_catalog._releaseSession(
- _sri, _session, std::move(_killToken), _clientTxnNumberStarted);
+ _sri, _session, std::move(_killToken), _clientTxnNumberStartedAndProvenance);
}
}
@@ -291,8 +316,9 @@ public:
return bool(_killToken);
}
- void observeNewClientTxnNumberStarted(TxnNumber txnNumber) {
- _clientTxnNumberStarted = txnNumber;
+ void observeNewClientTxnNumberStarted(
+ SessionCatalog::TxnNumberAndProvenance txnNumberAndProvenance) {
+ _clientTxnNumberStartedAndProvenance = txnNumberAndProvenance;
}
private:
@@ -302,8 +328,9 @@ private:
// If this session began a retryable write or transaction while checked out, this is set to the
// "client txnNumber" of that transaction, which is the top-level txnNumber for a retryable
// write or transaction sent by a client or the txnNumber in the sessionId for a retryable
- // child transaction.
- boost::optional<TxnNumber> _clientTxnNumberStarted;
+ // child transaction, and the "provenance" of the number, ie whether the number came from the
+ // router or participant role.
+ boost::optional<SessionCatalog::TxnNumberAndProvenance> _clientTxnNumberStartedAndProvenance;
SessionCatalog::SessionRuntimeInfo* _sri;
Session* _session;
@@ -510,9 +537,10 @@ public:
* Notifies the session catalog when a new transaction/retryable write is begun on the operation
* context's checked out session.
*/
- static void observeNewTxnNumberStarted(OperationContext* opCtx,
- const LogicalSessionId& lsid,
- TxnNumber txnNumber);
+ static void observeNewTxnNumberStarted(
+ OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ SessionCatalog::TxnNumberAndProvenance txnNumberAndProvenance);
private:
OperationContext* const _opCtx;
diff --git a/src/mongo/db/session/session_catalog_mongod.cpp b/src/mongo/db/session/session_catalog_mongod.cpp
index 3d48fa19ad8..7e2eab73e6d 100644
--- a/src/mongo/db/session/session_catalog_mongod.cpp
+++ b/src/mongo/db/session/session_catalog_mongod.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/repl/repl_server_parameters_gen.h"
#include "mongo/db/repl/storage_interface_impl.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session/internal_transactions_reap_service.h"
#include "mongo/db/session/session_txn_record_gen.h"
#include "mongo/db/session/sessions_collection.h"
#include "mongo/db/transaction/transaction_participant.h"
@@ -480,6 +481,16 @@ MongoDSessionCatalog* MongoDSessionCatalog::get(ServiceContext* service) {
void MongoDSessionCatalog::set(ServiceContext* service,
std::unique_ptr<MongoDSessionCatalog> sessionCatalog) {
getMongoDSessionCatalog(service) = std::move(sessionCatalog);
+
+ // Set mongod specific behaviors on the SessionCatalog.
+ SessionCatalog::get(service)->setEagerReapSessionsFns(
+ InternalTransactionsReapService::onEagerlyReapedSessions,
+ [](ServiceContext* service,
+ TxnNumber clientTxnNumberStarted,
+ SessionCatalog::Provenance provenance) {
+ return MongoDSessionCatalog::get(service)->makeSessionWorkerFnForEagerReap(
+ clientTxnNumberStarted, provenance);
+ });
}
BSONObj MongoDSessionCatalog::getConfigTxnPartialIndexSpec() {
@@ -694,6 +705,11 @@ void MongoDSessionCatalog::checkOutUnscopedSession(OperationContext* opCtx) {
_checkOutUnscopedSession(opCtx, _ti.get());
}
+SessionCatalog::ScanSessionsCallbackFn MongoDSessionCatalog::makeSessionWorkerFnForEagerReap(
+ TxnNumber clientTxnNumberStarted, SessionCatalog::Provenance provenance) {
+ return _ti->makeSessionWorkerFnForEagerReap(clientTxnNumberStarted, provenance);
+}
+
MongoDOperationContextSession::MongoDOperationContextSession(
OperationContext* opCtx, MongoDSessionCatalogTransactionInterface* ti)
: _operationContextSession(opCtx), _ti(ti) {
diff --git a/src/mongo/db/session/session_catalog_mongod.h b/src/mongo/db/session/session_catalog_mongod.h
index 910843c0744..4ffb7f8ecbc 100644
--- a/src/mongo/db/session/session_catalog_mongod.h
+++ b/src/mongo/db/session/session_catalog_mongod.h
@@ -182,6 +182,13 @@ public:
OperationContextSession::CheckInReason reason);
void checkOutUnscopedSession(OperationContext* opCtx);
+ /**
+ * Returns a function that should be used to determine when a session can be eagerly reaped from
+ * the SessionCatalog on a mongod.
+ */
+ SessionCatalog::ScanSessionsCallbackFn makeSessionWorkerFnForEagerReap(
+ TxnNumber clientTxnNumberStarted, SessionCatalog::Provenance provenance);
+
private:
std::unique_ptr<MongoDSessionCatalogTransactionInterface> _ti;
};
diff --git a/src/mongo/db/session/session_catalog_mongod_transaction_interface.h b/src/mongo/db/session/session_catalog_mongod_transaction_interface.h
index 25baa540d04..4800a57457d 100644
--- a/src/mongo/db/session/session_catalog_mongod_transaction_interface.h
+++ b/src/mongo/db/session/session_catalog_mongod_transaction_interface.h
@@ -128,6 +128,13 @@ public:
virtual ScanSessionsCallbackFn makeSessionWorkerFnForStepUp(
std::vector<SessionCatalog::KillToken>* sessionKillTokens,
std::vector<OperationSessionInfo>* sessionsToReacquireLocks) = 0;
+
+ /**
+ * Returns a function that should be used to determine when a session can be eagerly reaped from
+ * the SessionCatalog on a mongod.
+ */
+ virtual ScanSessionsCallbackFn makeSessionWorkerFnForEagerReap(
+ TxnNumber clientTxnNumberStarted, SessionCatalog::Provenance provenance) = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction/SConscript b/src/mongo/db/transaction/SConscript
index c47c180e46f..cfbff6332ae 100644
--- a/src/mongo/db/transaction/SConscript
+++ b/src/mongo/db/transaction/SConscript
@@ -7,7 +7,6 @@ env = env.Clone()
env.Library(
target='transaction',
source=[
- 'internal_transactions_reap_service.cpp',
'retryable_writes_stats.cpp',
'server_transactions_metrics.cpp',
'session_catalog_mongod_transaction_interface_impl.cpp',
@@ -15,7 +14,6 @@ env.Library(
'transaction_metrics_observer.cpp',
'transaction_participant.cpp',
'transaction_participant_resource_yielder.cpp',
- 'internal_transactions_reap_service.idl',
'transaction_participant.idl',
'transactions_stats.idl',
],
@@ -92,7 +90,6 @@ env.Library(
env.CppUnitTest(
target='db_transaction_test',
source=[
- 'internal_transactions_reap_service_test.cpp',
'transaction_api_test.cpp',
'transaction_history_iterator_test.cpp',
'transaction_operations_test.cpp',
@@ -116,6 +113,7 @@ env.CppUnitTest(
'$BUILD_DIR/mongo/db/stats/transaction_stats',
'$BUILD_DIR/mongo/db/storage/storage_control',
'$BUILD_DIR/mongo/executor/inline_executor',
+ '$BUILD_DIR/mongo/s/sharding_router_api',
'transaction',
'transaction_api',
'transaction_operations',
diff --git a/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.cpp b/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.cpp
index b7a9aedf03f..d2986287db1 100644
--- a/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.cpp
+++ b/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.cpp
@@ -105,7 +105,7 @@ MongoDSessionCatalogTransactionInterface::ScanSessionsCallbackFn
MongoDSessionCatalogTransactionInterfaceImpl::makeParentSessionWorkerFnForReap(
TxnNumber* parentSessionActiveTxnNumber) {
return [parentSessionActiveTxnNumber](ObservableSession& parentSession) {
- const auto transactionSessionId = parentSession.getSessionId();
+ const auto& transactionSessionId = parentSession.getSessionId();
const auto txnParticipant = TransactionParticipant::get(parentSession);
const auto txnRouter = TransactionRouter::get(parentSession);
@@ -127,7 +127,7 @@ MongoDSessionCatalogTransactionInterface::ScanSessionsCallbackFn
MongoDSessionCatalogTransactionInterfaceImpl::makeChildSessionWorkerFnForReap(
const TxnNumber& parentSessionActiveTxnNumber) {
return [&parentSessionActiveTxnNumber](ObservableSession& childSession) {
- const auto transactionSessionId = childSession.getSessionId();
+ const auto& transactionSessionId = childSession.getSessionId();
const auto txnParticipant = TransactionParticipant::get(childSession);
const auto txnRouter = TransactionRouter::get(childSession);
@@ -183,4 +183,25 @@ MongoDSessionCatalogTransactionInterfaceImpl::makeSessionWorkerFnForStepUp(
};
}
+MongoDSessionCatalogTransactionInterface::ScanSessionsCallbackFn
+MongoDSessionCatalogTransactionInterfaceImpl::makeSessionWorkerFnForEagerReap(
+ TxnNumber clientTxnNumberStarted, SessionCatalog::Provenance provenance) {
+ return [clientTxnNumberStarted, provenance](ObservableSession& osession) {
+ const auto& transactionSessionId = osession.getSessionId();
+ const auto txnParticipant = TransactionParticipant::get(osession);
+
+ // If a retryable session has been used for a TransactionParticipant, it may be in the
+ // retryable participant catalog. A participant triggers eager reaping after clearing its
+ // participant catalog, but a router may trigger reaping before, so we can only eager reap
+ // an initialized participant if the reap came from the participant role.
+ if (provenance == SessionCatalog::Provenance::kParticipant ||
+ txnParticipant.getActiveTxnNumberAndRetryCounter().getTxnNumber() ==
+ kUninitializedTxnNumber) {
+ if (isInternalSessionForRetryableWrite(transactionSessionId) &&
+ *transactionSessionId.getTxnNumber() < clientTxnNumberStarted) {
+ osession.markForReap(ObservableSession::ReapMode::kExclusive);
+ }
+ }
+ };
+}
} // namespace mongo
diff --git a/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h b/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h
index aa982da3ccc..aad38734974 100644
--- a/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h
+++ b/src/mongo/db/transaction/session_catalog_mongod_transaction_interface_impl.h
@@ -74,6 +74,9 @@ public:
ScanSessionsCallbackFn makeSessionWorkerFnForStepUp(
std::vector<SessionCatalog::KillToken>* sessionKillTokens,
std::vector<OperationSessionInfo>* sessionsToReacquireLocks) override;
+
+ ScanSessionsCallbackFn makeSessionWorkerFnForEagerReap(
+ TxnNumber clientTxnNumberStarted, SessionCatalog::Provenance provenance) override;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction/transaction_participant.cpp b/src/mongo/db/transaction/transaction_participant.cpp
index 65c611509f5..a1e1b205fec 100644
--- a/src/mongo/db/transaction/transaction_participant.cpp
+++ b/src/mongo/db/transaction/transaction_participant.cpp
@@ -2876,7 +2876,9 @@ void TransactionParticipant::Participant::_setNewTxnNumberAndRetryCounter(
// Only observe parent sessions because retryable transactions begin the same txnNumber on
// their parent session.
OperationContextSession::observeNewTxnNumberStarted(
- opCtx, _sessionId(), txnNumberAndRetryCounter.getTxnNumber());
+ opCtx,
+ _sessionId(),
+ {txnNumberAndRetryCounter.getTxnNumber(), SessionCatalog::Provenance::kParticipant});
}
}
diff --git a/src/mongo/db/transaction/transaction_participant_test.cpp b/src/mongo/db/transaction/transaction_participant_test.cpp
index 9fb8a1d0851..54376f4cc6a 100644
--- a/src/mongo/db/transaction/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction/transaction_participant_test.cpp
@@ -59,6 +59,8 @@
#include "mongo/db/txn_retry_counter_too_old_info.h"
#include "mongo/idl/server_parameter_test_util.h"
#include "mongo/logv2/log.h"
+#include "mongo/s/session_catalog_router.h"
+#include "mongo/s/transaction_router.h"
#include "mongo/stdx/future.h"
#include "mongo/unittest/barrier.h"
#include "mongo/unittest/death_test.h"
@@ -6811,5 +6813,123 @@ TEST_F(ShardTxnParticipantTest, CheckingOutEagerlyReapedSessionDoesNotCrash) {
ErrorCodes::TransactionTooOld);
}
+class TxnParticipantAndTxnRouterTest : public TxnParticipantTest {
+protected:
+ bool doesExistInCatalog(const LogicalSessionId& lsid, SessionCatalog* sessionCatalog) {
+ bool existsInCatalog{false};
+ sessionCatalog->scanSession(
+ lsid, [&](const ObservableSession& session) { existsInCatalog = true; });
+ return existsInCatalog;
+ }
+
+ void runRouterTransactionLeaveOpen(LogicalSessionId lsid, TxnNumber txnNumber) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<RouterOperationContextSession>(opCtx);
+
+ auto txnRouter = TransactionRouter::get(opCtx);
+ txnRouter.beginOrContinueTxn(
+ opCtx, *opCtx->getTxnNumber(), TransactionRouter::TransactionActions::kStart);
+ });
+ }
+
+ void runParticipantTransactionLeaveOpen(LogicalSessionId lsid, TxnNumber txnNumber) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = MongoDSessionCatalog::get(opCtx)->checkOutSession(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(opCtx,
+ {*opCtx->getTxnNumber()},
+ false /* autocommit */,
+ true /* startTransaction */);
+ });
+ }
+};
+
+TEST_F(TxnParticipantAndTxnRouterTest, SkipEagerReapingSessionUsedByParticipantFromRouter) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with two retryable children.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRouterTransactionLeaveOpen(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runRouterTransactionLeaveOpen(retryableChildLsid, 0);
+
+ auto retryableChildLsidReapable =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runRouterTransactionLeaveOpen(retryableChildLsidReapable, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsidReapable, sessionCatalog));
+
+ // Use one retryable session with a TransactionParticipant and verify this blocks the router
+ // role from reaping it.
+
+ runParticipantTransactionLeaveOpen(retryableChildLsid, 0);
+
+ // Start a higher txnNumber client transaction in the router role and verify the child used with
+ // TransactionParticipant was not erased but the other one was.
+
+ parentTxnNumber++;
+ runRouterTransactionLeaveOpen(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsidReapable, sessionCatalog));
+
+ // Verify the participant role can reap the child.
+
+ auto higherRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runParticipantTransactionLeaveOpen(higherRetryableChildLsid, 5);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsidReapable, sessionCatalog));
+
+ // Sanity check that higher txnNumbers are reaped correctly and eager reaping only applies to
+ // parent and children sessions in the same "family."
+
+ auto parentLsid2 = makeLogicalSessionIdForTest();
+ auto parentTxnNumber2 = parentTxnNumber + 11;
+ runParticipantTransactionLeaveOpen(parentLsid2, parentTxnNumber2);
+
+ auto retryableChildLsid2 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid2, parentTxnNumber2);
+ runRouterTransactionLeaveOpen(retryableChildLsid2, 12131);
+
+ ASSERT_EQ(sessionCatalog->size(), 2);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(parentLsid2, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+
+ parentTxnNumber2++;
+ runParticipantTransactionLeaveOpen(parentLsid2, parentTxnNumber2);
+
+ // The unrelated sessions still exist and the superseded child was reaped.
+ ASSERT_EQ(sessionCatalog->size(), 2);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(parentLsid2, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 1ffb0bd5a55..5ad8e707b61 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -1488,17 +1488,10 @@ void TransactionRouter::Router::_resetRouterState(
p().firstStmtId = kDefaultFirstStmtId;
}
- if (isMongos()) {
- // This may trigger removing superseded retryable sessions from the session catalog, but the
- // router and mongod roles share the catalog, and the mongod role makes assumptions around
- // the validity of retryable sessions that can be violated if the router role triggers
- // reaping them. So only early reap on a mongos, and let retryable sessions be either early
- // reaped by the mongod role or by the session reaper periodic job.
- //
- // TODO SERVER-76368: Allow the router role to always trigger an early reap.
- OperationContextSession::observeNewTxnNumberStarted(
- opCtx, _sessionId(), txnNumberAndRetryCounter.getTxnNumber());
- }
+ OperationContextSession::observeNewTxnNumberStarted(
+ opCtx,
+ _sessionId(),
+ {txnNumberAndRetryCounter.getTxnNumber(), SessionCatalog::Provenance::kRouter});
};
void TransactionRouter::Router::_resetRouterStateForStartTransaction(
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 2ba049e65d8..e0ba9490a74 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -43,7 +43,6 @@
#include "mongo/logv2/log.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/catalog/type_shard.h"
-#include "mongo/s/is_mongos.h"
#include "mongo/s/router_transactions_metrics.h"
#include "mongo/s/session_catalog_router.h"
#include "mongo/s/sharding_router_test_fixture.h"
@@ -140,13 +139,6 @@ protected:
_staleVersionAndSnapshotRetriesBlock = std::make_unique<FailPointEnableBlock>(
"enableStaleVersionAndSnapshotRetriesWithinTransactions");
-
- setMongos(true);
- }
-
- void tearDown() override {
- setMongos(false);
- ShardingTestFixture::tearDown();
}
void disableRouterRetriesFailPoint() {
@@ -5429,36 +5421,5 @@ TEST_F(TransactionRouterTest, EagerlyReapRetryableSessionsUponNewRetryableTransa
ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog));
}
-TEST_F(TransactionRouterTest, SkipEagerReapingOnMongod) {
- setMongos(false);
- ON_BLOCK_EXIT([] { setMongos(true); });
-
- auto sessionCatalog = SessionCatalog::get(getServiceContext());
- ASSERT_EQ(sessionCatalog->size(), 0);
-
- // Add a parent session with one retryable child.
-
- auto parentLsid = makeLogicalSessionIdForTest();
- auto parentTxnNumber = 0;
- runTransactionLeaveOpen(parentLsid, parentTxnNumber);
-
- auto retryableChildLsid =
- makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
- runTransactionLeaveOpen(retryableChildLsid, 0);
-
- ASSERT_EQ(sessionCatalog->size(), 1);
- ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
- ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
-
- // Start a higher txnNumber client transaction and verify the child was not erased.
-
- parentTxnNumber++;
- runTransactionLeaveOpen(parentLsid, parentTxnNumber);
-
- ASSERT_EQ(sessionCatalog->size(), 1);
- ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
- ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
-}
-
} // unnamed namespace
} // namespace mongo