summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/concurrency/fsm_workloads/internal_transactions_resharding.js4
-rw-r--r--jstests/concurrency/fsm_workloads/internal_transactions_sharded.js3
-rw-r--r--jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js33
-rw-r--r--jstests/sharding/internal_txns/eager_reaping.js215
-rw-r--r--src/mongo/db/SConscript4
-rw-r--r--src/mongo/db/internal_transactions_reap_service.cpp161
-rw-r--r--src/mongo/db/internal_transactions_reap_service.h111
-rw-r--r--src/mongo/db/internal_transactions_reap_service.idl45
-rw-r--r--src/mongo/db/internal_transactions_reap_service_test.cpp385
-rw-r--r--src/mongo/db/mongod_main.cpp4
-rw-r--r--src/mongo/db/session_catalog.cpp21
-rw-r--r--src/mongo/db/session_catalog.h17
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp141
-rw-r--r--src/mongo/db/session_catalog_mongod.h10
14 files changed, 1090 insertions, 64 deletions
diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js b/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js
index 31c6e3df3b8..23fa3fcc77e 100644
--- a/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js
+++ b/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js
@@ -26,6 +26,10 @@ var $config = extendWorkload($config, function($config, $super) {
$config.data.currentShardKeyIndex = -1;
$config.data.reshardingCount = 0;
+ // TODO SERVER-67076: Investigate segfault in resharding image collection agg stage with
+ // concurrent reaps.
+ $config.data.overrideReapThreshold = false;
+
$config.data.getQueryForDocument = function getQueryForDocument(doc) {
// The query for a write command against a sharded collection must contain the shard key.
const query = $super.data.getQueryForDocument.apply(this, arguments);
diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js
index a38fb8b020f..f2c7b502a8d 100644
--- a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js
+++ b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js
@@ -105,6 +105,9 @@ var $config = extendWorkload($config, function($config, $super) {
}
}
+ if (this.overrideReapThreshold) {
+ this.overrideInternalTransactionsReapThreshold(cluster);
+ }
this.overrideStoreFindAndModifyImagesInSideCollection(cluster);
if (this.lowerTransactionLifetimeLimitSeconds) {
this.overrideTransactionLifetimeLimit(cluster);
diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js b/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js
index e4136b12137..e6736c0c6d2 100644
--- a/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js
+++ b/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js
@@ -75,6 +75,11 @@ var $config = extendWorkload($config, function($config, $super) {
// during setup() and restores the original value during teardown().
$config.data.originalStoreFindAndModifyImagesInSideCollection = {};
+ // The reap threshold is overriden to get coverage for when it schedules reaps during an active
+ // workload.
+ $config.data.originalInternalSessionReapThreshold = {};
+ $config.data.overrideReapThreshold = true;
+
// This workload supports setting the 'transactionLifetimeLimitSeconds' to 45 seconds
// (configurable) during setup() and restoring the original value during teardown().
$config.data.lowerTransactionLifetimeLimitSeconds = false;
@@ -474,6 +479,28 @@ var $config = extendWorkload($config, function($config, $super) {
assert.commandWorked(bulk.execute());
};
+ $config.data.overrideInternalTransactionsReapThreshold =
+ function overrideInternalTransactionsReapThreshold(cluster) {
+ const newThreshold = this.generateRandomInt(0, 4);
+ print("Setting internalSessionsReapThreshold to " + newThreshold);
+ cluster.executeOnMongodNodes((db) => {
+ const res = assert.commandWorked(
+ db.adminCommand({setParameter: 1, internalSessionsReapThreshold: newThreshold}));
+ this.originalInternalSessionReapThreshold[db.getMongo().host] = res.was;
+ });
+ };
+
+ $config.data.restoreInternalTransactionsReapThreshold =
+ function restoreInternalTransactionsReapThreshold(cluster) {
+ cluster.executeOnMongodNodes((db) => {
+ assert.commandWorked(db.adminCommand({
+ setParameter: 1,
+ internalSessionsReapThreshold:
+ this.originalInternalSessionReapThreshold[db.getMongo().host]
+ }));
+ });
+ };
+
$config.data.overrideStoreFindAndModifyImagesInSideCollection =
function overrideStoreFindAndModifyImagesInSideCollection(cluster) {
// Store the findAndModify images in the oplog half of the time.
@@ -529,6 +556,9 @@ var $config = extendWorkload($config, function($config, $super) {
this.insertInitialDocuments(db, collName, tid);
}
}
+ if (this.overrideReapThreshold) {
+ this.overrideInternalTransactionsReapThreshold(cluster);
+ }
this.overrideStoreFindAndModifyImagesInSideCollection(cluster);
if (this.lowerTransactionLifetimeLimitSeconds) {
this.overrideTransactionLifetimeLimit(cluster);
@@ -536,6 +566,9 @@ var $config = extendWorkload($config, function($config, $super) {
};
$config.teardown = function teardown(db, collName, cluster) {
+ if (this.overrideReapThreshold) {
+ this.restoreInternalTransactionsReapThreshold(cluster);
+ }
this.restoreStoreFindAndModifyImagesInSideCollection(cluster);
if (this.lowerTransactionLifetimeLimitSeconds) {
this.restoreTransactionLifetimeLimit(cluster);
diff --git a/jstests/sharding/internal_txns/eager_reaping.js b/jstests/sharding/internal_txns/eager_reaping.js
new file mode 100644
index 00000000000..cb584c77b70
--- /dev/null
+++ b/jstests/sharding/internal_txns/eager_reaping.js
@@ -0,0 +1,215 @@
+/*
+ * Tests that transaction records for retryable internal sessions are reaped eagerly when they are
+ * reaped early from memory.
+ *
+ * @tags: [requires_fcv_60, uses_transactions]
+ */
+(function() {
+"use strict";
+
+const st = new ShardingTest({shards: 1, config: 1});
+
+const kDbName = "testDb";
+const kCollName = "testColl";
+const mongosTestColl = st.s.getCollection(kDbName + "." + kCollName);
+assert.commandWorked(mongosTestColl.insert({x: 1})); // Set up the collection.
+
+function assertNumEntries(conn,
+ {sessionUUID, numImageCollectionEntries, numTransactionsCollEntries}) {
+ const filter = {"_id.id": sessionUUID};
+
+ const imageColl = conn.getCollection("config.image_collection");
+ assert.eq(numImageCollectionEntries,
+ imageColl.find(filter).itcount(),
+ tojson(imageColl.find().toArray()));
+
+ const transactionsColl = conn.getCollection("config.transactions");
+ assert.eq(numTransactionsCollEntries,
+ transactionsColl.find(filter).itcount(),
+ tojson(transactionsColl.find().toArray()));
+}
+
+function runInternalTxn(conn, lsid, txnNumber) {
+ const testInternalTxnCmdObj = {
+ testInternalTransactions: 1,
+ commandInfos: [{
+ dbName: kDbName,
+ command: {
+ // Use findAndModify to generate image collection entries.
+ findAndModify: kCollName,
+ query: {x: 1},
+ update: {$inc: {counter: 1}},
+ stmtId: NumberInt(3),
+ }
+ }],
+ lsid: lsid,
+
+ };
+ if (txnNumber !== undefined) {
+ testInternalTxnCmdObj.txnNumber = NumberLong(txnNumber);
+ }
+ assert.commandWorked(conn.adminCommand(testInternalTxnCmdObj));
+}
+
+function assertNumEntriesSoon(
+ shardConn, {sessionUUID, numImageCollectionEntries, numTransactionsCollEntries}) {
+ // Sleep a little so it's likely the reaping has finished and we can avoid spamming the logs.
+ sleep(100);
+ assert.soonNoExcept(() => {
+ assertNumEntries(shardConn,
+ {sessionUUID, numImageCollectionEntries, numTransactionsCollEntries});
+ return true;
+ }, "Expected internal transactions to be reaped eventually", undefined, 100 /* interval */);
+}
+
+function runTest(conn, shardConn) {
+ // Lower the threshold to speed up the test and verify it's respected.
+ const reapThreshold = 100;
+ assert.commandWorked(
+ shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: reapThreshold}));
+
+ //
+ // Reaping happens at the threshold.
+ //
+
+ let parentLsid = {id: UUID()};
+
+ // No transaction records at first.
+ assertNumEntries(
+ shardConn,
+ {sessionUUID: parentLsid.id, numImageCollectionEntries: 0, numTransactionsCollEntries: 0});
+
+ // Records build up until the reap threshold.
+ for (let i = 0; i < reapThreshold; i++) {
+ runInternalTxn(conn, parentLsid, i);
+ }
+ assertNumEntries(shardConn, {
+ sessionUUID: parentLsid.id,
+ numImageCollectionEntries: reapThreshold,
+ numTransactionsCollEntries: reapThreshold
+ });
+
+ // Push the number of eagerly reaped sessions up to the threshold and verify this triggers
+ // reaping them.
+ runInternalTxn(conn, parentLsid, reapThreshold + 1);
+ assertNumEntriesSoon(
+ shardConn,
+ {sessionUUID: parentLsid.id, numImageCollectionEntries: 1, numTransactionsCollEntries: 1});
+
+ //
+ // Reaping can run more than once.
+ //
+
+ for (let i = 0; i < reapThreshold; i++) {
+ // We're on the same session as before, so pick higher txnNumbers than used before.
+ const txnNumber = i + reapThreshold + 1;
+ runInternalTxn(conn, parentLsid, txnNumber);
+ }
+ assertNumEntriesSoon(
+ shardConn,
+ {sessionUUID: parentLsid.id, numImageCollectionEntries: 1, numTransactionsCollEntries: 1});
+
+ //
+ // Buffered sessions are cleared on failover.
+ //
+
+ parentLsid = {id: UUID()};
+
+ const numBeforeFailover = (reapThreshold / 2) + 1;
+ for (let i = 0; i < numBeforeFailover; i++) {
+ runInternalTxn(conn, parentLsid, i);
+ }
+ assertNumEntries(shardConn, {
+ sessionUUID: parentLsid.id,
+ numImageCollectionEntries: numBeforeFailover,
+ numTransactionsCollEntries: numBeforeFailover
+ });
+
+ // Step down and back up the new primary and verify it only reaps newly expired internal
+ // sessions.
+
+ assert.commandWorked(
+ shardConn.adminCommand({replSetStepDown: ReplSetTest.kForeverSecs, force: true}));
+ assert.commandWorked(shardConn.adminCommand({replSetFreeze: 0}));
+ st.rs0.stepUp(shardConn);
+ shardConn = st.rs0.getPrimary();
+
+ const numAfterFailover = (reapThreshold / 2) + 1;
+ assert(numAfterFailover + numBeforeFailover > reapThreshold);
+ for (let i = 0; i < numAfterFailover; i++) {
+ const txnNumber = i + numBeforeFailover; // Account for txnNumbers used before failover.
+ runInternalTxn(conn, parentLsid, txnNumber);
+ }
+ assertNumEntries(shardConn, {
+ sessionUUID: parentLsid.id,
+ numImageCollectionEntries: numBeforeFailover + numAfterFailover,
+ numTransactionsCollEntries: numBeforeFailover + numAfterFailover
+ });
+
+ // Insert up to the threshold and verify a reap is triggered.
+ for (let i = 0; i < reapThreshold - numAfterFailover; i++) {
+ const txnNumber = i + 1000; // Account for txnNumbers used earlier.
+ runInternalTxn(conn, parentLsid, txnNumber);
+ }
+ assertNumEntriesSoon(shardConn, {
+ sessionUUID: parentLsid.id,
+ numImageCollectionEntries: numBeforeFailover,
+ numTransactionsCollEntries: numBeforeFailover
+ });
+
+ //
+ // Reaping ignores non-retryable sessions and parent sessions.
+ //
+
+ parentLsid = {id: UUID()};
+
+ runInternalTxn(conn, parentLsid); // Non-retryable transaction.
+ assert.commandWorked(conn.getDB("test").runCommand({
+ insert: "foo",
+ documents: [{x: 1}],
+ lsid: parentLsid,
+ txnNumber: NumberLong(0),
+ stmtId: NumberInt(0)
+ }));
+
+ // Run enough retryable transactions to trigger a reap.
+ for (let i = 0; i < reapThreshold + 1; i++) {
+ const txnNumber = i + 1; // Account for the retryable write's txnNumber.
+ runInternalTxn(conn, parentLsid, txnNumber);
+ }
+ // Expect 3: the parent entry, the non-retryable entry, and the latest retryable child. Only the
+ // retryable child has an image entry, so just expect 1 of those.
+ assertNumEntriesSoon(
+ shardConn,
+ {sessionUUID: parentLsid.id, numImageCollectionEntries: 1, numTransactionsCollEntries: 3});
+}
+
+// Validates behavior about the configurable reap threshold server parameter.
+function runParameterTest(conn, shardConn) {
+ // Must be a number.
+ assert.commandFailedWithCode(
+ shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: "wontwork"}),
+ ErrorCodes.BadValue);
+
+ // Can't be set negative.
+ assert.commandFailedWithCode(
+ shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: -1}),
+ ErrorCodes.BadValue);
+
+ // Can be set to 0 or a positive value.
+ assert.commandWorked(
+ shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: 0}));
+ assert.commandWorked(
+ shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: 12345}));
+
+ // Doesn't exist on mongos. This fails with no error code so check the errmsg.
+ const res = assert.commandFailed(
+ conn.adminCommand({setParameter: 1, internalSessionsReapThreshold: 222}));
+ assert(res.errmsg.includes("unrecognized parameter"), tojson(res));
+}
+
+runTest(st.s, st.rs0.getPrimary());
+runParameterTest(st.s, st.rs0.getPrimary());
+
+st.stop();
+})();
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 672c234dd3d..1518f7bfcb0 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -804,6 +804,7 @@ env.Library(
env.Library(
target='transaction',
source=[
+ 'internal_transactions_reap_service.cpp',
'retryable_writes_stats.cpp',
'server_transactions_metrics.cpp',
'session_catalog_mongod.cpp',
@@ -811,6 +812,7 @@ env.Library(
'transaction_metrics_observer.cpp',
'transaction_participant.cpp',
'transaction_participant_resource_yielder.cpp',
+ 'internal_transactions_reap_service.idl',
'session_txn_record.idl',
'transaction_participant.idl',
'transactions_stats.idl',
@@ -827,6 +829,7 @@ env.Library(
'$BUILD_DIR/mongo/db/commands/server_status',
'$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/index/index_access_method',
+ '$BUILD_DIR/mongo/db/repl/replica_set_aware_service',
'$BUILD_DIR/mongo/idl/idl_parser',
'$BUILD_DIR/mongo/idl/server_parameter',
'$BUILD_DIR/mongo/s/sharding_router_api',
@@ -2669,6 +2672,7 @@ if wiredtiger:
'index_build_entry_helpers_test.cpp',
'index_builds_coordinator_mongod_test.cpp',
'internal_session_pool_test.cpp',
+ 'internal_transactions_reap_service_test.cpp',
'keypattern_test.cpp',
'keys_collection_document_test.cpp',
'logical_session_cache_test.cpp',
diff --git a/src/mongo/db/internal_transactions_reap_service.cpp b/src/mongo/db/internal_transactions_reap_service.cpp
new file mode 100644
index 00000000000..997de84b376
--- /dev/null
+++ b/src/mongo/db/internal_transactions_reap_service.cpp
@@ -0,0 +1,161 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTransaction
+
+#include "mongo/db/internal_transactions_reap_service.h"
+
+#include "mongo/db/internal_transactions_reap_service_gen.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/logv2/log.h"
+
+namespace mongo {
+
+namespace {
+
+MONGO_FAIL_POINT_DEFINE(pauseInternalTransactionsReaperAfterSwap);
+
+const auto serviceDecoration = ServiceContext::declareDecoration<InternalTransactionsReapService>();
+
+} // namespace
+
+InternalTransactionsReapService* InternalTransactionsReapService::get(
+ ServiceContext* serviceContext) {
+ return &serviceDecoration(serviceContext);
+}
+
+InternalTransactionsReapService* InternalTransactionsReapService::get(OperationContext* opCtx) {
+ return get(opCtx->getServiceContext());
+}
+
+const ReplicaSetAwareServiceRegistry::Registerer<InternalTransactionsReapService>
+ internalTransactionsReapServiceRegisterer("InternalTransactionsReapService");
+
+InternalTransactionsReapService::InternalTransactionsReapService() {
+ _threadPool = std::make_shared<ThreadPool>([] {
+ ThreadPool::Options options;
+ options.poolName = "InternalTransactionsReapService";
+ options.minThreads = 0;
+ options.maxThreads = 1;
+ return options;
+ }());
+}
+
+void InternalTransactionsReapService::onEagerlyReapedSessions(
+ ServiceContext* service, std::vector<LogicalSessionId> lsidsToRemove) {
+ InternalTransactionsReapService::get(service)->addEagerlyReapedSessions(
+ service, std::move(lsidsToRemove));
+}
+
+void InternalTransactionsReapService::addEagerlyReapedSessions(
+ ServiceContext* service, std::vector<LogicalSessionId> lsidsToRemove) {
+ auto reapThreshold = internalSessionsReapThreshold.loadRelaxed();
+ if (reapThreshold == 0) {
+ // A threshold of 0 disables eager reaping.
+ return;
+ }
+
+ stdx::lock_guard lg(_mutex);
+ if (!_enabled) {
+ return;
+ }
+
+ _lsidsToEagerlyReap.insert(
+ _lsidsToEagerlyReap.end(), lsidsToRemove.begin(), lsidsToRemove.end());
+
+ // reapThreshold is an integer, but is always greater than or equal to 0 so it
+ // should be safe to cast to size_t.
+ bool isAtThreshold = _lsidsToEagerlyReap.size() >= static_cast<size_t>(reapThreshold);
+ bool isCurrentlyDrainingSessions = _drainedSessionsFuture && !_drainedSessionsFuture->isReady();
+
+ if (isAtThreshold && !isCurrentlyDrainingSessions) {
+ // Kick off reaping the buffer of internal transaction sessions.
+ _drainedSessionsFuture.reset();
+ _drainedSessionsFuture = ExecutorFuture<void>(_threadPool).then([this, service] {
+ _reapInternalTransactions(service);
+ });
+ }
+}
+
+void InternalTransactionsReapService::onStartup(OperationContext* opCtx) {
+ _threadPool->startup();
+}
+
+void InternalTransactionsReapService::onShutdown() {
+ _threadPool->shutdown();
+ _threadPool->join();
+}
+
+void InternalTransactionsReapService::_reapInternalTransactions(ServiceContext* service) try {
+ ThreadClient tc("reap-internal-transactions", service);
+ {
+ stdx::lock_guard<Client> lk(*tc.get());
+ tc->setSystemOperationKillableByStepdown(lk);
+ }
+ auto uniqueOpCtx = tc->makeOperationContext();
+ auto opCtx = uniqueOpCtx.get();
+ opCtx->setAlwaysInterruptAtStepDownOrUp();
+
+ std::vector<LogicalSessionId> lsidsToRemove;
+ {
+ using std::swap;
+ stdx::lock_guard lg(_mutex);
+ swap(lsidsToRemove, _lsidsToEagerlyReap);
+ }
+
+ pauseInternalTransactionsReaperAfterSwap.pauseWhileSet(opCtx);
+
+ LOGV2_DEBUG(6697300,
+ 2,
+ "Eagerly reaping internal transactions from disk",
+ "numToReap"_attr = lsidsToRemove.size());
+
+ auto numReaped = MongoDSessionCatalog::removeSessionsTransactionRecords(opCtx, lsidsToRemove);
+
+ LOGV2_DEBUG(
+ 6697301, 2, "Eagerly reaped internal transactions from disk", "numReaped"_attr = numReaped);
+} catch (const DBException& ex) {
+ // Ignore errors.
+ LOGV2(6697302,
+ "Failed to eagerly reap internal transactions from disk",
+ "error"_attr = redact(ex));
+}
+
+void InternalTransactionsReapService::waitForCurrentDrain_forTest() {
+ if (_drainedSessionsFuture) {
+ _drainedSessionsFuture->wait();
+ }
+ return;
+}
+
+bool InternalTransactionsReapService::hasCurrentDrain_forTest() {
+ return _drainedSessionsFuture && !_drainedSessionsFuture->isReady();
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/internal_transactions_reap_service.h b/src/mongo/db/internal_transactions_reap_service.h
new file mode 100644
index 00000000000..75fd5aba30b
--- /dev/null
+++ b/src/mongo/db/internal_transactions_reap_service.h
@@ -0,0 +1,111 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/replica_set_aware_service.h"
+#include "mongo/util/concurrency/thread_pool.h"
+
+namespace mongo {
+
+/**
+ * Service responsible for removing completed internal transactions persisted metadata when they can
+ * no longer be used. Only enabled on replica set primary nodes.
+ */
+class InternalTransactionsReapService
+ : public ReplicaSetAwareService<InternalTransactionsReapService> {
+public:
+ InternalTransactionsReapService();
+
+ static InternalTransactionsReapService* get(ServiceContext* serviceContext);
+ static InternalTransactionsReapService* get(OperationContext* opCtx);
+
+ /**
+ * Adds the eagerly reaped sessions to the reap service to be buffered and later reaped. Does
+ * not verify they have actually expired, so callers must guarantee they are safe to remove.
+ */
+ static void onEagerlyReapedSessions(ServiceContext* service,
+ std::vector<LogicalSessionId> lsidsToRemove);
+
+ /**
+ * Buffers the sessions to be reaped when the max batch size has been reached, upon which a task
+ * is scheduled to reap the transactions.
+ */
+ void addEagerlyReapedSessions(ServiceContext* service,
+ std::vector<LogicalSessionId> lsidsToRemove);
+
+ /**
+ * Used in tests to wait for a kicked off drain to complete.
+ */
+ void waitForCurrentDrain_forTest();
+
+ /**
+ * Used in tests to detect when the drain task is running.
+ */
+ bool hasCurrentDrain_forTest();
+
+private:
+ /**
+ * Will remove all currently buffered sessions ids from config.transactions and
+ * config.image_collection.
+ */
+ void _reapInternalTransactions(ServiceContext* service);
+
+ void onStartup(OperationContext* opCtx) override final;
+ void onShutdown() override final;
+
+ void onStepUpComplete(OperationContext* opCtx, long long term) override final {
+ stdx::lock_guard lg(_mutex);
+ _enabled = true;
+ }
+
+ void onStepDown() override final {
+ stdx::lock_guard lg(_mutex);
+ _enabled = false;
+ _lsidsToEagerlyReap.clear();
+ }
+
+ void onInitialDataAvailable(OperationContext* opCtx,
+ bool isMajorityDataAvailable) override final {}
+ void onStepUpBegin(OperationContext* opCtx, long long term) override final {}
+ void onBecomeArbiter() override final {}
+
+ std::shared_ptr<ThreadPool> _threadPool;
+
+ // Protects the state below.
+ mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1),
+ "InternalTransactionsReapService::_mutex");
+
+ bool _enabled{false};
+ boost::optional<ExecutorFuture<void>> _drainedSessionsFuture;
+ std::vector<LogicalSessionId> _lsidsToEagerlyReap;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/internal_transactions_reap_service.idl b/src/mongo/db/internal_transactions_reap_service.idl
new file mode 100644
index 00000000000..86c8997f3aa
--- /dev/null
+++ b/src/mongo/db/internal_transactions_reap_service.idl
@@ -0,0 +1,45 @@
+# Copyright (C) 2022-present MongoDB, Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the Server Side Public License, version 1,
+# as published by MongoDB, Inc.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# Server Side Public License for more details.
+#
+# You should have received a copy of the Server Side Public License
+# along with this program. If not, see
+# <http://www.mongodb.com/licensing/server-side-public-license>.
+#
+# As a special exception, the copyright holders give permission to link the
+# code of portions of this program with the OpenSSL library under certain
+# conditions as described in each individual source file and distribute
+# linked combinations including the program with the OpenSSL library. You
+# must comply with the Server Side Public License in all respects for
+# all of the code used other than as permitted herein. If you modify file(s)
+# with this exception, you may extend this exception to your version of the
+# file(s), but you are not obligated to do so. If you do not wish to do so,
+# delete this exception statement from your version. If you delete this
+# exception statement from all source files in the program, then also delete
+# it in the license file.
+#
+
+# This IDL file describes the BSON format for a LogicalSessionId, and
+# handles the serialization to and deserialization from its BSON representation
+# for that class.
+
+global:
+ cpp_namespace: "mongo"
+
+server_parameters:
+ internalSessionsReapThreshold:
+ description: 'The minimum number of expired internal sessions that must be buffered in memory
+ before they are early reaped. 0 disables early reaping.'
+ set_at: [startup, runtime]
+ cpp_vartype: 'AtomicWord<int>'
+ cpp_varname: internalSessionsReapThreshold
+ default: 1000
+ validator:
+ gte: 0
diff --git a/src/mongo/db/internal_transactions_reap_service_test.cpp b/src/mongo/db/internal_transactions_reap_service_test.cpp
new file mode 100644
index 00000000000..0e9d956b627
--- /dev/null
+++ b/src/mongo/db/internal_transactions_reap_service_test.cpp
@@ -0,0 +1,385 @@
+/**
+ * Copyright (C) 2022-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/internal_transactions_reap_service.h"
+#include "mongo/db/internal_transactions_reap_service_gen.h"
+#include "mongo/db/repl/image_collection_entry_gen.h"
+#include "mongo/db/repl/replica_set_aware_service.h"
+#include "mongo/db/repl/replication_coordinator_mock.h"
+#include "mongo/db/repl/storage_interface_impl.h"
+#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session_catalog_mongod.h"
+#include "mongo/db/session_txn_record_gen.h"
+
+namespace mongo {
+namespace {
+
+class InternalTransactionsReapServiceTest : public ServiceContextMongoDTest {
+protected:
+ void setUp() override {
+ ServiceContextMongoDTest::setUp();
+
+ const auto service = getServiceContext();
+ _opCtxHolder = makeOperationContext();
+ const auto opCtx = _opCtxHolder.get();
+
+ repl::StorageInterface::set(service, std::make_unique<repl::StorageInterfaceImpl>());
+ repl::ReplicationCoordinator::set(
+ service,
+ std::make_unique<repl::ReplicationCoordinatorMock>(service, createReplSettings()));
+ repl::createOplog(opCtx);
+
+ ReplicaSetAwareServiceRegistry::get(service).onStartup(opCtx);
+
+ auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY));
+ MongoDSessionCatalog::onStepUp(opCtx);
+
+ // Lower the reap threshold for faster tests.
+ _originalInternalSessionsReapThreshold = internalSessionsReapThreshold.load();
+ internalSessionsReapThreshold.store(50);
+
+ onStepUp();
+ }
+
+ void tearDown() override {
+ internalSessionsReapThreshold.store(_originalInternalSessionsReapThreshold);
+ _opCtxHolder.reset();
+ ReplicaSetAwareServiceRegistry::get(getServiceContext()).onShutdown();
+ ServiceContextMongoDTest::tearDown();
+ }
+
+ InternalTransactionsReapService* reapService() {
+ return InternalTransactionsReapService::get(getServiceContext());
+ }
+
+ OperationContext* opCtx() const {
+ return _opCtxHolder.get();
+ }
+
+ void onStepUp() {
+ // Set a last applied opTime to satisfy an invariant in the primary only service registry.
+ auto newLastAppliedOpTime = repl::OpTime(Timestamp(1, 1), 1);
+ repl::ReplicationCoordinator::get(getServiceContext())
+ ->setMyLastAppliedOpTimeAndWallTime({newLastAppliedOpTime, Date_t()});
+ ReplicaSetAwareServiceRegistry::get(getServiceContext()).onStepUpComplete(opCtx(), 1LL);
+ }
+
+ void onStepDown() {
+ ReplicaSetAwareServiceRegistry::get(getServiceContext()).onStepDown();
+ }
+
+ // Inserts entries for the given sessions into config.transactions and config.image_collection.
+ void insertSessionDocuments(const std::vector<LogicalSessionId>& lsids,
+ bool skipImageEntry = false,
+ bool skipSessionEntry = false) {
+ DBDirectClient client(opCtx());
+
+ if (!skipImageEntry) {
+ write_ops::checkWriteErrors(client.insert([&] {
+ write_ops::InsertCommandRequest imageInsertOp(
+ NamespaceString::kConfigImagesNamespace);
+ imageInsertOp.setWriteCommandRequestBase([] {
+ write_ops::WriteCommandRequestBase base;
+ base.setOrdered(false);
+ return base;
+ }());
+ imageInsertOp.setDocuments([&] {
+ std::vector<BSONObj> docs;
+ for (const auto& lsid : lsids) {
+ repl::ImageEntry entry(
+ lsid, 0, Timestamp(1, 1), repl::RetryImageEnum::kPreImage, BSONObj());
+ docs.emplace_back(entry.toBSON());
+ }
+ return docs;
+ }());
+ return imageInsertOp;
+ }()));
+ }
+
+ if (!skipSessionEntry) {
+ auto sessionDeleteReply = write_ops::checkWriteErrors(client.insert([&] {
+ write_ops::InsertCommandRequest sessionInsertOp(
+ NamespaceString::kSessionTransactionsTableNamespace);
+ sessionInsertOp.setWriteCommandRequestBase([] {
+ write_ops::WriteCommandRequestBase base;
+ base.setOrdered(false);
+ return base;
+ }());
+ sessionInsertOp.setDocuments([&] {
+ std::vector<BSONObj> docs;
+ for (const auto& lsid : lsids) {
+ SessionTxnRecord record(lsid, 0, repl::OpTime(), Date_t());
+ docs.emplace_back(record.toBSON());
+ }
+ return docs;
+ }());
+ return sessionInsertOp;
+ }()));
+ }
+ }
+
+ void assertInSessionsCollection(const std::vector<LogicalSessionId>& lsids) {
+ DBDirectClient client(opCtx());
+ for (const auto& lsid : lsids) {
+ auto numImageEntries =
+ client.count(NamespaceString::kConfigImagesNamespace,
+ BSON(repl::ImageEntry::k_idFieldName << lsid.toBSON()));
+ ASSERT_EQ(1, numImageEntries) << lsid.toBSON();
+ auto numSessionEntries =
+ client.count(NamespaceString::kSessionTransactionsTableNamespace,
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON()));
+ ASSERT_EQ(1, numSessionEntries) << lsid.toBSON();
+ }
+ }
+
+ void assertNotInSessionsCollection(const std::vector<LogicalSessionId>& lsids) {
+ DBDirectClient client(opCtx());
+ for (const auto& lsid : lsids) {
+ auto numImageEntries =
+ client.count(NamespaceString::kConfigImagesNamespace,
+ BSON(repl::ImageEntry::k_idFieldName << lsid.toBSON()));
+ ASSERT_EQ(0, numImageEntries) << lsid.toBSON();
+ auto numSessionEntries =
+ client.count(NamespaceString::kSessionTransactionsTableNamespace,
+ BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON()));
+ ASSERT_EQ(0, numSessionEntries) << lsid.toBSON();
+ }
+ }
+
+ void assertNoPersistedSessions() {
+ DBDirectClient client(opCtx());
+ auto numImageEntries = client.count(NamespaceString::kConfigImagesNamespace, BSONObj());
+ ASSERT_EQ(0, numImageEntries);
+ auto numSessionEntries =
+ client.count(NamespaceString::kSessionTransactionsTableNamespace, BSONObj());
+ ASSERT_EQ(0, numSessionEntries);
+ }
+
+ std::vector<LogicalSessionId> generateLsids(int num) {
+ std::vector<LogicalSessionId> lsids;
+ for (int i = 0; i < num; i++) {
+ lsids.push_back(makeLogicalSessionIdForTest());
+ }
+ return lsids;
+ }
+
+private:
+ repl::ReplSettings createReplSettings() {
+ repl::ReplSettings settings;
+ settings.setOplogSizeBytes(5 * 1024 * 1024);
+ settings.setReplSetString("mySet/node1:12345");
+ return settings;
+ }
+
+ int _originalInternalSessionsReapThreshold;
+ ServiceContext::UniqueOperationContext _opCtxHolder;
+};
+
+TEST_F(InternalTransactionsReapServiceTest, DoesNotReapUntilThreshold) {
+ auto lsidsBelowThreshold = generateLsids(internalSessionsReapThreshold.load() - 1);
+ insertSessionDocuments(lsidsBelowThreshold);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsBelowThreshold);
+ assertInSessionsCollection(lsidsBelowThreshold);
+
+ auto lsidToReachThreshold = generateLsids(1);
+ insertSessionDocuments(lsidToReachThreshold);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidToReachThreshold);
+ reapService()->waitForCurrentDrain_forTest();
+ assertNoPersistedSessions();
+}
+
+TEST_F(InternalTransactionsReapServiceTest, ExceedingThresholdTriggersReap) {
+ auto lsidsBelowThreshold = generateLsids(internalSessionsReapThreshold.load() - 1);
+ insertSessionDocuments(lsidsBelowThreshold);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsBelowThreshold);
+ assertInSessionsCollection(lsidsBelowThreshold);
+
+ // Passing the threshold by a large amount is fine and will trigger removing all accumulated
+ // sessions, not just the threshold amount.
+ auto lsidsToExceedThreshold = generateLsids(internalSessionsReapThreshold.load() * 2);
+ insertSessionDocuments(lsidsToExceedThreshold);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsToExceedThreshold);
+ reapService()->waitForCurrentDrain_forTest();
+ assertNoPersistedSessions();
+}
+
+TEST_F(InternalTransactionsReapServiceTest, CanReapMultipleRounds) {
+ auto reap = [&] {
+ auto lsidsBelowThreshold = generateLsids(internalSessionsReapThreshold.load() - 1);
+ insertSessionDocuments(lsidsBelowThreshold);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsBelowThreshold);
+ assertInSessionsCollection(lsidsBelowThreshold);
+
+ auto lsidToReachThreshold = generateLsids(1);
+ insertSessionDocuments(lsidToReachThreshold);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidToReachThreshold);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidToReachThreshold);
+ reapService()->waitForCurrentDrain_forTest();
+ assertNoPersistedSessions();
+ };
+
+ // Reaps can be triggered multiple times.
+ reap();
+ reap();
+ reap();
+}
+
+TEST_F(InternalTransactionsReapServiceTest, CanReapMoreThanMaxSessionDeletionBatchSize) {
+ auto lsidsOverBatchSize =
+ generateLsids(MongoDSessionCatalog::kMaxSessionDeletionBatchSize + 11);
+ insertSessionDocuments(lsidsOverBatchSize);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsOverBatchSize);
+ reapService()->waitForCurrentDrain_forTest();
+ assertNoPersistedSessions();
+}
+
+TEST_F(InternalTransactionsReapServiceTest, OnlySchedulesOneReapAtATime) {
+ auto pauseReaperThreadFp =
+ globalFailPointRegistry().find("pauseInternalTransactionsReaperAfterSwap");
+ auto timesEnteredFailPoint = pauseReaperThreadFp->setMode(FailPoint::alwaysOn);
+
+ auto lsidsForInitialReap = generateLsids(internalSessionsReapThreshold.load());
+ insertSessionDocuments(lsidsForInitialReap);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsForInitialReap);
+ ASSERT(reapService()->hasCurrentDrain_forTest());
+ assertInSessionsCollection(lsidsForInitialReap);
+
+ // Wait for fail point.
+ pauseReaperThreadFp->waitForTimesEntered(timesEnteredFailPoint + 1);
+
+ auto lsidsDuringReap = generateLsids(internalSessionsReapThreshold.load());
+ insertSessionDocuments(lsidsDuringReap);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsDuringReap);
+ ASSERT(reapService()->hasCurrentDrain_forTest());
+ assertInSessionsCollection(lsidsDuringReap);
+
+ // Disable fail point and verify the concurrent sessions weren't reaped.
+ pauseReaperThreadFp->setMode(FailPoint::off);
+ reapService()->waitForCurrentDrain_forTest();
+ assertNotInSessionsCollection(lsidsForInitialReap);
+ assertInSessionsCollection(lsidsDuringReap);
+
+ // The concurrently added sessions will be reaped in the next round.
+ reapService()->addEagerlyReapedSessions(getServiceContext(), generateLsids(1));
+ reapService()->waitForCurrentDrain_forTest();
+ assertNoPersistedSessions();
+};
+
+TEST_F(InternalTransactionsReapServiceTest, DoesNotErrorIfReceivedSameSessionTwice) {
+ auto lsids = generateLsids(2);
+ insertSessionDocuments(lsids);
+
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsids);
+ assertInSessionsCollection(lsids);
+
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsids);
+ assertInSessionsCollection(lsids);
+
+ auto lsidsToReachThreshold = generateLsids(internalSessionsReapThreshold.load());
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsToReachThreshold);
+ reapService()->waitForCurrentDrain_forTest();
+ assertNoPersistedSessions();
+}
+
+TEST_F(InternalTransactionsReapServiceTest, DoesNotErrorIfSessionOrImageIsNotOnDisk) {
+ auto lsidsNoImage = generateLsids(2);
+ insertSessionDocuments(lsidsNoImage, true /* skipImageEntry */);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsNoImage);
+
+ auto lsidsNoSession = generateLsids(2);
+ insertSessionDocuments(lsidsNoSession, false /* skipImageEntry */, true /* skipSessionEntry */);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsNoSession);
+
+ auto normalLsids = generateLsids(3);
+ insertSessionDocuments(normalLsids);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), normalLsids);
+
+ auto lsidsNoImageOrSession = generateLsids(internalSessionsReapThreshold.load());
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsNoImageOrSession);
+
+ reapService()->waitForCurrentDrain_forTest();
+ assertNoPersistedSessions();
+}
+
+TEST_F(InternalTransactionsReapServiceTest, DoesNotReapAsSecondaryAndClearsSessionsOnStepdown) {
+ auto lsidsAsPrimary = generateLsids(2);
+ insertSessionDocuments(lsidsAsPrimary);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsAsPrimary);
+ assertInSessionsCollection(lsidsAsPrimary);
+
+ onStepDown();
+
+ auto lsidsAsSecondary = generateLsids(internalSessionsReapThreshold.load());
+ insertSessionDocuments(lsidsAsSecondary);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsAsSecondary);
+ ASSERT_FALSE(reapService()->hasCurrentDrain_forTest());
+ assertInSessionsCollection(lsidsAsPrimary);
+ assertInSessionsCollection(lsidsAsSecondary);
+
+ onStepUp();
+
+ // Despite having seen more than the threshold as a secondary and previous primary, there should
+ // have been no sessions buffered so this will not trigger a refresh.
+ auto newLsidAsPrimary = generateLsids(1);
+ insertSessionDocuments(newLsidAsPrimary);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), newLsidAsPrimary);
+ ASSERT_FALSE(reapService()->hasCurrentDrain_forTest());
+ assertInSessionsCollection(lsidsAsPrimary);
+ assertInSessionsCollection(lsidsAsSecondary);
+ assertInSessionsCollection(newLsidAsPrimary);
+
+ auto lsidsToReachThreshold = generateLsids(internalSessionsReapThreshold.load());
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsToReachThreshold);
+ reapService()->waitForCurrentDrain_forTest();
+
+ // The previous sessions were missed and will remain until the normal logical session reaper
+ // logic removes them.
+ assertInSessionsCollection(lsidsAsPrimary);
+ assertInSessionsCollection(lsidsAsSecondary);
+
+ // But newly added sessions will be reaped.
+ assertNotInSessionsCollection(newLsidAsPrimary);
+ assertNotInSessionsCollection(lsidsToReachThreshold);
+}
+
+TEST_F(InternalTransactionsReapServiceTest, ThresholdOfZeroDisablesReaping) {
+ // tearDown() will restore the original value already, so there's no need to do it in the test.
+ internalSessionsReapThreshold.store(0);
+
+ auto lsidsOverZeroThreshold = generateLsids(10);
+ insertSessionDocuments(lsidsOverZeroThreshold);
+ reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsOverZeroThreshold);
+ ASSERT_FALSE(reapService()->hasCurrentDrain_forTest());
+ assertInSessionsCollection(lsidsOverZeroThreshold);
+}
+
+} // namespace
+} // namespace mongo
diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp
index a3a56f705f6..52a9c4651d6 100644
--- a/src/mongo/db/mongod_main.cpp
+++ b/src/mongo/db/mongod_main.cpp
@@ -91,6 +91,7 @@
#include "mongo/db/index_names.h"
#include "mongo/db/initialize_server_global_state.h"
#include "mongo/db/initialize_snmp.h"
+#include "mongo/db/internal_transactions_reap_service.h"
#include "mongo/db/introspect.h"
#include "mongo/db/json.h"
#include "mongo/db/keys_collection_client_direct.h"
@@ -158,6 +159,7 @@
#include "mongo/db/serverless/shard_split_donor_service.h"
#include "mongo/db/service_context.h"
#include "mongo/db/service_entry_point_mongod.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/session_killer.h"
#include "mongo/db/startup_recovery.h"
#include "mongo/db/startup_warnings_mongod.h"
@@ -1514,6 +1516,8 @@ int mongod_main(int argc, char* argv[]) {
setUpReplication(service);
setUpObservers(service);
service->setServiceEntryPoint(std::make_unique<ServiceEntryPointMongod>(service));
+ SessionCatalog::get(service)->setOnEagerlyReapedSessionsFn(
+ InternalTransactionsReapService::onEagerlyReapedSessions);
ErrorExtraInfo::invariantHaveAllParsers();
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index 68ff1654ed1..37a093576fa 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -309,7 +309,7 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
Session* session,
boost::optional<KillToken> killToken,
boost::optional<TxnNumber> clientTxnNumberStarted) {
- stdx::lock_guard<Latch> lg(_mutex);
+ stdx::unique_lock<Latch> ul(_mutex);
// Make sure we have exactly the same session on the map and that it is still associated with an
// operation context (meaning checked-out)
@@ -319,6 +319,8 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
dassert(killToken->lsidToKill == session->getSessionId());
}
+ ServiceContext* service = sri->checkoutOpCtx->getServiceContext();
+
sri->checkoutOpCtx = nullptr;
sri->availableCondVar.notify_all();
@@ -327,16 +329,22 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
--sri->killsRequested;
}
+ std::vector<LogicalSessionId> eagerlyReapedSessions;
if (clientTxnNumberStarted.has_value()) {
// Since the given txnNumber successfully started, we know any child sessions with older
// txnNumbers can be discarded. This needed to wait until a transaction started because that
// can fail, e.g. if the active transaction is prepared.
auto numReaped = stdx::erase_if(sri->childSessions, [&](auto&& it) {
- ObservableSession osession(lg, sri, &it.second);
+ ObservableSession osession(ul, sri, &it.second);
if (it.first.getTxnNumber() && *it.first.getTxnNumber() < *clientTxnNumberStarted) {
osession.markForReap(ObservableSession::ReapMode::kExclusive);
}
- return osession._shouldBeReaped();
+
+ bool willReap = osession._shouldBeReaped();
+ if (willReap) {
+ eagerlyReapedSessions.push_back(std::move(it.first));
+ }
+ return willReap;
});
LOGV2_DEBUG(6685200,
@@ -347,6 +355,13 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
"childSessionsRemaining"_attr = sri->childSessions.size(),
"numReaped"_attr = numReaped);
}
+
+ invariant(ul);
+ ul.unlock();
+
+ if (eagerlyReapedSessions.size() && _onEagerlyReapedSessionsFn) {
+ (*_onEagerlyReapedSessionsFn)(service, std::move(eagerlyReapedSessions));
+ }
}
Session* SessionCatalog::SessionRuntimeInfo::getSession(WithLock, const LogicalSessionId& lsid) {
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index b049550efd9..6201d6fd72b 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -60,6 +60,9 @@ class SessionCatalog {
friend class OperationContextSession;
public:
+ using OnEagerlyReapedSessionsFn =
+ unique_function<void(ServiceContext*, std::vector<LogicalSessionId>)>;
+
class ScopedCheckedOutSession;
class SessionToKill;
@@ -138,6 +141,15 @@ public:
*/
size_t size() const;
+ /**
+ * Registers a callback to run when sessions are "eagerly" reaped from the catalog, ie without
+ * waiting for a logical session cache refresh.
+ */
+ void setOnEagerlyReapedSessionsFn(OnEagerlyReapedSessionsFn fn) {
+ invariant(!_onEagerlyReapedSessionsFn);
+ _onEagerlyReapedSessionsFn = std::move(fn);
+ }
+
private:
/**
* Tracks the runtime info for transaction sessions that corresponds to the same logical
@@ -210,6 +222,11 @@ private:
boost::optional<KillToken> killToken,
boost::optional<TxnNumber> clientTxnNumberStarted);
+ // Called when sessions are reaped from memory "eagerly" ie directly by the SessionCatalog
+ // without waiting for a logical session cache refresh. Note this is set at process startup
+ // before multi-threading is enabled, so no synchronization is necessary.
+ boost::optional<OnEagerlyReapedSessionsFn> _onEagerlyReapedSessionsFn;
+
// Protects the state below
mutable Mutex _mutex =
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "SessionCatalog::_mutex");
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index eeb1ffb175a..fcefa1ed69b 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -208,12 +208,70 @@ const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
+template <typename SessionContainer>
+int removeSessionsTransactionRecordsFromDisk(OperationContext* opCtx,
+ const SessionContainer& transactionSessionIdsToReap) {
+ if (transactionSessionIdsToReap.empty()) {
+ return 0;
+ }
+
+ // Remove the config.image_collection entries for the expired transaction session ids. We first
+ // delete any images belonging to sessions about to be reaped, followed by the sessions. This
+ // way if there's a failure, we'll only be left with sessions that have a dangling reference
+ // to an image. Session reaping will rediscover the sessions to delete and try again.
+ //
+ // We opt for this rather than performing the two sets of deletes in a single transaction simply
+ // to reduce code complexity.
+ DBDirectClient client(opCtx);
+ write_ops::checkWriteErrors(client.remove([&] {
+ write_ops::DeleteCommandRequest imageDeleteOp(NamespaceString::kConfigImagesNamespace);
+ imageDeleteOp.setWriteCommandRequestBase([] {
+ write_ops::WriteCommandRequestBase base;
+ base.setOrdered(false);
+ return base;
+ }());
+ imageDeleteOp.setDeletes([&] {
+ std::vector<write_ops::DeleteOpEntry> entries;
+ for (const auto& transactionSessionId : transactionSessionIdsToReap) {
+ entries.emplace_back(
+ BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()),
+ false /* multi = false */);
+ }
+ return entries;
+ }());
+ return imageDeleteOp;
+ }()));
+
+ // Remove the config.transaction entries for the expired transaction session ids.
+ auto sessionDeleteReply = write_ops::checkWriteErrors(client.remove([&] {
+ write_ops::DeleteCommandRequest sessionDeleteOp(
+ NamespaceString::kSessionTransactionsTableNamespace);
+ sessionDeleteOp.setWriteCommandRequestBase([] {
+ write_ops::WriteCommandRequestBase base;
+ base.setOrdered(false);
+ return base;
+ }());
+ sessionDeleteOp.setDeletes([&] {
+ std::vector<write_ops::DeleteOpEntry> entries;
+ for (const auto& transactionSessionId : transactionSessionIdsToReap) {
+ entries.emplace_back(
+ BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()),
+ false /* multi = false */);
+ }
+ return entries;
+ }());
+ return sessionDeleteOp;
+ }()));
+
+ return sessionDeleteReply.getN();
+}
+
/**
* Removes the the config.transactions and the config.image_collection entries for the transaction
* sessions in 'expiredTransactionSessionIdsNotInUse' that are safe to reap. Returns the number
* of transaction sessions whose entries were removed.
*/
-int removeSessionsTransactionRecords(
+int removeSessionsTransactionRecordsIfExpired(
OperationContext* opCtx,
SessionsCollection& sessionsCollection,
const LogicalSessionIdSet& expiredTransactionSessionIdsNotInUse) {
@@ -267,59 +325,7 @@ int removeSessionsTransactionRecords(
}
}
- if (transactionSessionIdsToReap.empty()) {
- return 0;
- }
-
- // Remove the config.image_collection entries for the expired transaction session ids. We first
- // delete any images belonging to sessions about to be reaped, followed by the sessions. This
- // way if there's a failure, we'll only be left with sessions that have a dangling reference
- // to an image. Session reaping will rediscover the sessions to delete and try again.
- //
- // We opt for this rather than performing the two sets of deletes in a single transaction simply
- // to reduce code complexity.
- DBDirectClient client(opCtx);
- write_ops::checkWriteErrors(client.remove([&] {
- write_ops::DeleteCommandRequest imageDeleteOp(NamespaceString::kConfigImagesNamespace);
- imageDeleteOp.setWriteCommandRequestBase([] {
- write_ops::WriteCommandRequestBase base;
- base.setOrdered(false);
- return base;
- }());
- imageDeleteOp.setDeletes([&] {
- std::vector<write_ops::DeleteOpEntry> entries;
- for (const auto& transactionSessionId : transactionSessionIdsToReap) {
- entries.emplace_back(
- BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()),
- false /* multi = false */);
- }
- return entries;
- }());
- return imageDeleteOp;
- }()));
-
- // Remove the config.transaction entries for the expired transaction session ids.
- auto sessionDeleteReply = write_ops::checkWriteErrors(client.remove([&] {
- write_ops::DeleteCommandRequest sessionDeleteOp(
- NamespaceString::kSessionTransactionsTableNamespace);
- sessionDeleteOp.setWriteCommandRequestBase([] {
- write_ops::WriteCommandRequestBase base;
- base.setOrdered(false);
- return base;
- }());
- sessionDeleteOp.setDeletes([&] {
- std::vector<write_ops::DeleteOpEntry> entries;
- for (const auto& transactionSessionId : transactionSessionIdsToReap) {
- entries.emplace_back(
- BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()),
- false /* multi = false */);
- }
- return entries;
- }());
- return sessionDeleteOp;
- }()));
-
- return sessionDeleteReply.getN();
+ return removeSessionsTransactionRecordsFromDisk(opCtx, transactionSessionIdsToReap);
}
/**
@@ -340,10 +346,6 @@ int removeExpiredTransactionSessionsFromDisk(
findRequest.setProjection(kIdProjection);
auto cursor = client.find(std::move(findRequest));
- // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object size
- // limit.
- const int kMaxBatchSize = 10'000;
-
LogicalSessionIdSet expiredTransactionSessionIdsNotInUse;
int numReaped = 0;
while (cursor->more()) {
@@ -357,13 +359,14 @@ int removeExpiredTransactionSessionsFromDisk(
}
expiredTransactionSessionIdsNotInUse.insert(transactionSessionId);
- if (expiredTransactionSessionIdsNotInUse.size() > kMaxBatchSize) {
- numReaped += removeSessionsTransactionRecords(
+ if (expiredTransactionSessionIdsNotInUse.size() >
+ MongoDSessionCatalog::kMaxSessionDeletionBatchSize) {
+ numReaped += removeSessionsTransactionRecordsIfExpired(
opCtx, sessionsCollection, expiredTransactionSessionIdsNotInUse);
expiredTransactionSessionIdsNotInUse.clear();
}
}
- numReaped += removeSessionsTransactionRecords(
+ numReaped += removeSessionsTransactionRecordsIfExpired(
opCtx, sessionsCollection, expiredTransactionSessionIdsNotInUse);
return numReaped;
@@ -621,6 +624,22 @@ int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx,
opCtx, sessionsCollection, possiblyExpired, expiredTransactionSessionIdsStillInUse);
}
+int MongoDSessionCatalog::removeSessionsTransactionRecords(
+ OperationContext* opCtx, const std::vector<LogicalSessionId>& transactionSessionIdsToRemove) {
+ std::vector<LogicalSessionId> nextLsidBatch;
+ int numReaped = 0;
+ for (const auto& transactionSessionIdToRemove : transactionSessionIdsToRemove) {
+ nextLsidBatch.push_back(transactionSessionIdToRemove);
+ if (nextLsidBatch.size() > MongoDSessionCatalog::kMaxSessionDeletionBatchSize) {
+ numReaped += removeSessionsTransactionRecordsFromDisk(opCtx, nextLsidBatch);
+ nextLsidBatch.clear();
+ }
+ }
+ numReaped += removeSessionsTransactionRecordsFromDisk(opCtx, nextLsidBatch);
+
+ return numReaped;
+}
+
MongoDOperationContextSession::MongoDOperationContextSession(OperationContext* opCtx)
: _operationContextSession(opCtx) {
invariant(!opCtx->getClient()->isInDirectClient());
diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h
index eb8402aece4..5c092e7e005 100644
--- a/src/mongo/db/session_catalog_mongod.h
+++ b/src/mongo/db/session_catalog_mongod.h
@@ -39,6 +39,10 @@ class MongoDSessionCatalog {
public:
static const std::string kConfigTxnsPartialIndexName;
+ // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object size
+ // limit.
+ static const int kMaxSessionDeletionBatchSize = 10'000;
+
/**
* Returns the specification for the partial index on config.transactions used to support
* retryable transactions.
@@ -83,6 +87,12 @@ public:
static int reapSessionsOlderThan(OperationContext* opCtx,
SessionsCollection& sessionsCollection,
Date_t possiblyExpired);
+
+ /**
+ * Deletes the given session ids from config.transactions and config.image_collection.
+ */
+ static int removeSessionsTransactionRecords(OperationContext* opCtx,
+ const std::vector<LogicalSessionId>& lsidsToRemove);
};
/**