From 266864d409101588abcad8ebe75dec0cd60bf11c Mon Sep 17 00:00:00 2001 From: Jack Mulrow Date: Tue, 7 Jun 2022 17:12:34 +0000 Subject: SERVER-66973 Eagerly reap config.transactions entries for retryable sessions (cherry picked from commit 77e9ff7fd5e822ed2c560db2f601648a6dd58abf) --- .../internal_transactions_resharding.js | 4 + .../fsm_workloads/internal_transactions_sharded.js | 3 + .../internal_transactions_unsharded.js | 33 ++ jstests/sharding/internal_txns/eager_reaping.js | 215 ++++++++++++ src/mongo/db/SConscript | 4 + .../db/internal_transactions_reap_service.cpp | 161 +++++++++ src/mongo/db/internal_transactions_reap_service.h | 111 ++++++ .../db/internal_transactions_reap_service.idl | 45 +++ .../db/internal_transactions_reap_service_test.cpp | 385 +++++++++++++++++++++ src/mongo/db/mongod_main.cpp | 4 + src/mongo/db/session_catalog.cpp | 21 +- src/mongo/db/session_catalog.h | 17 + src/mongo/db/session_catalog_mongod.cpp | 141 ++++---- src/mongo/db/session_catalog_mongod.h | 10 + 14 files changed, 1090 insertions(+), 64 deletions(-) create mode 100644 jstests/sharding/internal_txns/eager_reaping.js create mode 100644 src/mongo/db/internal_transactions_reap_service.cpp create mode 100644 src/mongo/db/internal_transactions_reap_service.h create mode 100644 src/mongo/db/internal_transactions_reap_service.idl create mode 100644 src/mongo/db/internal_transactions_reap_service_test.cpp diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js b/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js index 31c6e3df3b8..23fa3fcc77e 100644 --- a/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js +++ b/jstests/concurrency/fsm_workloads/internal_transactions_resharding.js @@ -26,6 +26,10 @@ var $config = extendWorkload($config, function($config, $super) { $config.data.currentShardKeyIndex = -1; $config.data.reshardingCount = 0; + // TODO SERVER-67076: Investigate segfault in resharding image collection agg stage with + // concurrent reaps. + $config.data.overrideReapThreshold = false; + $config.data.getQueryForDocument = function getQueryForDocument(doc) { // The query for a write command against a sharded collection must contain the shard key. const query = $super.data.getQueryForDocument.apply(this, arguments); diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js index a38fb8b020f..f2c7b502a8d 100644 --- a/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js +++ b/jstests/concurrency/fsm_workloads/internal_transactions_sharded.js @@ -105,6 +105,9 @@ var $config = extendWorkload($config, function($config, $super) { } } + if (this.overrideReapThreshold) { + this.overrideInternalTransactionsReapThreshold(cluster); + } this.overrideStoreFindAndModifyImagesInSideCollection(cluster); if (this.lowerTransactionLifetimeLimitSeconds) { this.overrideTransactionLifetimeLimit(cluster); diff --git a/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js b/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js index e4136b12137..e6736c0c6d2 100644 --- a/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js +++ b/jstests/concurrency/fsm_workloads/internal_transactions_unsharded.js @@ -75,6 +75,11 @@ var $config = extendWorkload($config, function($config, $super) { // during setup() and restores the original value during teardown(). $config.data.originalStoreFindAndModifyImagesInSideCollection = {}; + // The reap threshold is overriden to get coverage for when it schedules reaps during an active + // workload. + $config.data.originalInternalSessionReapThreshold = {}; + $config.data.overrideReapThreshold = true; + // This workload supports setting the 'transactionLifetimeLimitSeconds' to 45 seconds // (configurable) during setup() and restoring the original value during teardown(). $config.data.lowerTransactionLifetimeLimitSeconds = false; @@ -474,6 +479,28 @@ var $config = extendWorkload($config, function($config, $super) { assert.commandWorked(bulk.execute()); }; + $config.data.overrideInternalTransactionsReapThreshold = + function overrideInternalTransactionsReapThreshold(cluster) { + const newThreshold = this.generateRandomInt(0, 4); + print("Setting internalSessionsReapThreshold to " + newThreshold); + cluster.executeOnMongodNodes((db) => { + const res = assert.commandWorked( + db.adminCommand({setParameter: 1, internalSessionsReapThreshold: newThreshold})); + this.originalInternalSessionReapThreshold[db.getMongo().host] = res.was; + }); + }; + + $config.data.restoreInternalTransactionsReapThreshold = + function restoreInternalTransactionsReapThreshold(cluster) { + cluster.executeOnMongodNodes((db) => { + assert.commandWorked(db.adminCommand({ + setParameter: 1, + internalSessionsReapThreshold: + this.originalInternalSessionReapThreshold[db.getMongo().host] + })); + }); + }; + $config.data.overrideStoreFindAndModifyImagesInSideCollection = function overrideStoreFindAndModifyImagesInSideCollection(cluster) { // Store the findAndModify images in the oplog half of the time. @@ -529,6 +556,9 @@ var $config = extendWorkload($config, function($config, $super) { this.insertInitialDocuments(db, collName, tid); } } + if (this.overrideReapThreshold) { + this.overrideInternalTransactionsReapThreshold(cluster); + } this.overrideStoreFindAndModifyImagesInSideCollection(cluster); if (this.lowerTransactionLifetimeLimitSeconds) { this.overrideTransactionLifetimeLimit(cluster); @@ -536,6 +566,9 @@ var $config = extendWorkload($config, function($config, $super) { }; $config.teardown = function teardown(db, collName, cluster) { + if (this.overrideReapThreshold) { + this.restoreInternalTransactionsReapThreshold(cluster); + } this.restoreStoreFindAndModifyImagesInSideCollection(cluster); if (this.lowerTransactionLifetimeLimitSeconds) { this.restoreTransactionLifetimeLimit(cluster); diff --git a/jstests/sharding/internal_txns/eager_reaping.js b/jstests/sharding/internal_txns/eager_reaping.js new file mode 100644 index 00000000000..cb584c77b70 --- /dev/null +++ b/jstests/sharding/internal_txns/eager_reaping.js @@ -0,0 +1,215 @@ +/* + * Tests that transaction records for retryable internal sessions are reaped eagerly when they are + * reaped early from memory. + * + * @tags: [requires_fcv_60, uses_transactions] + */ +(function() { +"use strict"; + +const st = new ShardingTest({shards: 1, config: 1}); + +const kDbName = "testDb"; +const kCollName = "testColl"; +const mongosTestColl = st.s.getCollection(kDbName + "." + kCollName); +assert.commandWorked(mongosTestColl.insert({x: 1})); // Set up the collection. + +function assertNumEntries(conn, + {sessionUUID, numImageCollectionEntries, numTransactionsCollEntries}) { + const filter = {"_id.id": sessionUUID}; + + const imageColl = conn.getCollection("config.image_collection"); + assert.eq(numImageCollectionEntries, + imageColl.find(filter).itcount(), + tojson(imageColl.find().toArray())); + + const transactionsColl = conn.getCollection("config.transactions"); + assert.eq(numTransactionsCollEntries, + transactionsColl.find(filter).itcount(), + tojson(transactionsColl.find().toArray())); +} + +function runInternalTxn(conn, lsid, txnNumber) { + const testInternalTxnCmdObj = { + testInternalTransactions: 1, + commandInfos: [{ + dbName: kDbName, + command: { + // Use findAndModify to generate image collection entries. + findAndModify: kCollName, + query: {x: 1}, + update: {$inc: {counter: 1}}, + stmtId: NumberInt(3), + } + }], + lsid: lsid, + + }; + if (txnNumber !== undefined) { + testInternalTxnCmdObj.txnNumber = NumberLong(txnNumber); + } + assert.commandWorked(conn.adminCommand(testInternalTxnCmdObj)); +} + +function assertNumEntriesSoon( + shardConn, {sessionUUID, numImageCollectionEntries, numTransactionsCollEntries}) { + // Sleep a little so it's likely the reaping has finished and we can avoid spamming the logs. + sleep(100); + assert.soonNoExcept(() => { + assertNumEntries(shardConn, + {sessionUUID, numImageCollectionEntries, numTransactionsCollEntries}); + return true; + }, "Expected internal transactions to be reaped eventually", undefined, 100 /* interval */); +} + +function runTest(conn, shardConn) { + // Lower the threshold to speed up the test and verify it's respected. + const reapThreshold = 100; + assert.commandWorked( + shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: reapThreshold})); + + // + // Reaping happens at the threshold. + // + + let parentLsid = {id: UUID()}; + + // No transaction records at first. + assertNumEntries( + shardConn, + {sessionUUID: parentLsid.id, numImageCollectionEntries: 0, numTransactionsCollEntries: 0}); + + // Records build up until the reap threshold. + for (let i = 0; i < reapThreshold; i++) { + runInternalTxn(conn, parentLsid, i); + } + assertNumEntries(shardConn, { + sessionUUID: parentLsid.id, + numImageCollectionEntries: reapThreshold, + numTransactionsCollEntries: reapThreshold + }); + + // Push the number of eagerly reaped sessions up to the threshold and verify this triggers + // reaping them. + runInternalTxn(conn, parentLsid, reapThreshold + 1); + assertNumEntriesSoon( + shardConn, + {sessionUUID: parentLsid.id, numImageCollectionEntries: 1, numTransactionsCollEntries: 1}); + + // + // Reaping can run more than once. + // + + for (let i = 0; i < reapThreshold; i++) { + // We're on the same session as before, so pick higher txnNumbers than used before. + const txnNumber = i + reapThreshold + 1; + runInternalTxn(conn, parentLsid, txnNumber); + } + assertNumEntriesSoon( + shardConn, + {sessionUUID: parentLsid.id, numImageCollectionEntries: 1, numTransactionsCollEntries: 1}); + + // + // Buffered sessions are cleared on failover. + // + + parentLsid = {id: UUID()}; + + const numBeforeFailover = (reapThreshold / 2) + 1; + for (let i = 0; i < numBeforeFailover; i++) { + runInternalTxn(conn, parentLsid, i); + } + assertNumEntries(shardConn, { + sessionUUID: parentLsid.id, + numImageCollectionEntries: numBeforeFailover, + numTransactionsCollEntries: numBeforeFailover + }); + + // Step down and back up the new primary and verify it only reaps newly expired internal + // sessions. + + assert.commandWorked( + shardConn.adminCommand({replSetStepDown: ReplSetTest.kForeverSecs, force: true})); + assert.commandWorked(shardConn.adminCommand({replSetFreeze: 0})); + st.rs0.stepUp(shardConn); + shardConn = st.rs0.getPrimary(); + + const numAfterFailover = (reapThreshold / 2) + 1; + assert(numAfterFailover + numBeforeFailover > reapThreshold); + for (let i = 0; i < numAfterFailover; i++) { + const txnNumber = i + numBeforeFailover; // Account for txnNumbers used before failover. + runInternalTxn(conn, parentLsid, txnNumber); + } + assertNumEntries(shardConn, { + sessionUUID: parentLsid.id, + numImageCollectionEntries: numBeforeFailover + numAfterFailover, + numTransactionsCollEntries: numBeforeFailover + numAfterFailover + }); + + // Insert up to the threshold and verify a reap is triggered. + for (let i = 0; i < reapThreshold - numAfterFailover; i++) { + const txnNumber = i + 1000; // Account for txnNumbers used earlier. + runInternalTxn(conn, parentLsid, txnNumber); + } + assertNumEntriesSoon(shardConn, { + sessionUUID: parentLsid.id, + numImageCollectionEntries: numBeforeFailover, + numTransactionsCollEntries: numBeforeFailover + }); + + // + // Reaping ignores non-retryable sessions and parent sessions. + // + + parentLsid = {id: UUID()}; + + runInternalTxn(conn, parentLsid); // Non-retryable transaction. + assert.commandWorked(conn.getDB("test").runCommand({ + insert: "foo", + documents: [{x: 1}], + lsid: parentLsid, + txnNumber: NumberLong(0), + stmtId: NumberInt(0) + })); + + // Run enough retryable transactions to trigger a reap. + for (let i = 0; i < reapThreshold + 1; i++) { + const txnNumber = i + 1; // Account for the retryable write's txnNumber. + runInternalTxn(conn, parentLsid, txnNumber); + } + // Expect 3: the parent entry, the non-retryable entry, and the latest retryable child. Only the + // retryable child has an image entry, so just expect 1 of those. + assertNumEntriesSoon( + shardConn, + {sessionUUID: parentLsid.id, numImageCollectionEntries: 1, numTransactionsCollEntries: 3}); +} + +// Validates behavior about the configurable reap threshold server parameter. +function runParameterTest(conn, shardConn) { + // Must be a number. + assert.commandFailedWithCode( + shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: "wontwork"}), + ErrorCodes.BadValue); + + // Can't be set negative. + assert.commandFailedWithCode( + shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: -1}), + ErrorCodes.BadValue); + + // Can be set to 0 or a positive value. + assert.commandWorked( + shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: 0})); + assert.commandWorked( + shardConn.adminCommand({setParameter: 1, internalSessionsReapThreshold: 12345})); + + // Doesn't exist on mongos. This fails with no error code so check the errmsg. + const res = assert.commandFailed( + conn.adminCommand({setParameter: 1, internalSessionsReapThreshold: 222})); + assert(res.errmsg.includes("unrecognized parameter"), tojson(res)); +} + +runTest(st.s, st.rs0.getPrimary()); +runParameterTest(st.s, st.rs0.getPrimary()); + +st.stop(); +})(); diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 672c234dd3d..1518f7bfcb0 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -804,6 +804,7 @@ env.Library( env.Library( target='transaction', source=[ + 'internal_transactions_reap_service.cpp', 'retryable_writes_stats.cpp', 'server_transactions_metrics.cpp', 'session_catalog_mongod.cpp', @@ -811,6 +812,7 @@ env.Library( 'transaction_metrics_observer.cpp', 'transaction_participant.cpp', 'transaction_participant_resource_yielder.cpp', + 'internal_transactions_reap_service.idl', 'session_txn_record.idl', 'transaction_participant.idl', 'transactions_stats.idl', @@ -827,6 +829,7 @@ env.Library( '$BUILD_DIR/mongo/db/commands/server_status', '$BUILD_DIR/mongo/db/commands/test_commands_enabled', '$BUILD_DIR/mongo/db/index/index_access_method', + '$BUILD_DIR/mongo/db/repl/replica_set_aware_service', '$BUILD_DIR/mongo/idl/idl_parser', '$BUILD_DIR/mongo/idl/server_parameter', '$BUILD_DIR/mongo/s/sharding_router_api', @@ -2669,6 +2672,7 @@ if wiredtiger: 'index_build_entry_helpers_test.cpp', 'index_builds_coordinator_mongod_test.cpp', 'internal_session_pool_test.cpp', + 'internal_transactions_reap_service_test.cpp', 'keypattern_test.cpp', 'keys_collection_document_test.cpp', 'logical_session_cache_test.cpp', diff --git a/src/mongo/db/internal_transactions_reap_service.cpp b/src/mongo/db/internal_transactions_reap_service.cpp new file mode 100644 index 00000000000..997de84b376 --- /dev/null +++ b/src/mongo/db/internal_transactions_reap_service.cpp @@ -0,0 +1,161 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kTransaction + +#include "mongo/db/internal_transactions_reap_service.h" + +#include "mongo/db/internal_transactions_reap_service_gen.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/logv2/log.h" + +namespace mongo { + +namespace { + +MONGO_FAIL_POINT_DEFINE(pauseInternalTransactionsReaperAfterSwap); + +const auto serviceDecoration = ServiceContext::declareDecoration(); + +} // namespace + +InternalTransactionsReapService* InternalTransactionsReapService::get( + ServiceContext* serviceContext) { + return &serviceDecoration(serviceContext); +} + +InternalTransactionsReapService* InternalTransactionsReapService::get(OperationContext* opCtx) { + return get(opCtx->getServiceContext()); +} + +const ReplicaSetAwareServiceRegistry::Registerer + internalTransactionsReapServiceRegisterer("InternalTransactionsReapService"); + +InternalTransactionsReapService::InternalTransactionsReapService() { + _threadPool = std::make_shared([] { + ThreadPool::Options options; + options.poolName = "InternalTransactionsReapService"; + options.minThreads = 0; + options.maxThreads = 1; + return options; + }()); +} + +void InternalTransactionsReapService::onEagerlyReapedSessions( + ServiceContext* service, std::vector lsidsToRemove) { + InternalTransactionsReapService::get(service)->addEagerlyReapedSessions( + service, std::move(lsidsToRemove)); +} + +void InternalTransactionsReapService::addEagerlyReapedSessions( + ServiceContext* service, std::vector lsidsToRemove) { + auto reapThreshold = internalSessionsReapThreshold.loadRelaxed(); + if (reapThreshold == 0) { + // A threshold of 0 disables eager reaping. + return; + } + + stdx::lock_guard lg(_mutex); + if (!_enabled) { + return; + } + + _lsidsToEagerlyReap.insert( + _lsidsToEagerlyReap.end(), lsidsToRemove.begin(), lsidsToRemove.end()); + + // reapThreshold is an integer, but is always greater than or equal to 0 so it + // should be safe to cast to size_t. + bool isAtThreshold = _lsidsToEagerlyReap.size() >= static_cast(reapThreshold); + bool isCurrentlyDrainingSessions = _drainedSessionsFuture && !_drainedSessionsFuture->isReady(); + + if (isAtThreshold && !isCurrentlyDrainingSessions) { + // Kick off reaping the buffer of internal transaction sessions. + _drainedSessionsFuture.reset(); + _drainedSessionsFuture = ExecutorFuture(_threadPool).then([this, service] { + _reapInternalTransactions(service); + }); + } +} + +void InternalTransactionsReapService::onStartup(OperationContext* opCtx) { + _threadPool->startup(); +} + +void InternalTransactionsReapService::onShutdown() { + _threadPool->shutdown(); + _threadPool->join(); +} + +void InternalTransactionsReapService::_reapInternalTransactions(ServiceContext* service) try { + ThreadClient tc("reap-internal-transactions", service); + { + stdx::lock_guard lk(*tc.get()); + tc->setSystemOperationKillableByStepdown(lk); + } + auto uniqueOpCtx = tc->makeOperationContext(); + auto opCtx = uniqueOpCtx.get(); + opCtx->setAlwaysInterruptAtStepDownOrUp(); + + std::vector lsidsToRemove; + { + using std::swap; + stdx::lock_guard lg(_mutex); + swap(lsidsToRemove, _lsidsToEagerlyReap); + } + + pauseInternalTransactionsReaperAfterSwap.pauseWhileSet(opCtx); + + LOGV2_DEBUG(6697300, + 2, + "Eagerly reaping internal transactions from disk", + "numToReap"_attr = lsidsToRemove.size()); + + auto numReaped = MongoDSessionCatalog::removeSessionsTransactionRecords(opCtx, lsidsToRemove); + + LOGV2_DEBUG( + 6697301, 2, "Eagerly reaped internal transactions from disk", "numReaped"_attr = numReaped); +} catch (const DBException& ex) { + // Ignore errors. + LOGV2(6697302, + "Failed to eagerly reap internal transactions from disk", + "error"_attr = redact(ex)); +} + +void InternalTransactionsReapService::waitForCurrentDrain_forTest() { + if (_drainedSessionsFuture) { + _drainedSessionsFuture->wait(); + } + return; +} + +bool InternalTransactionsReapService::hasCurrentDrain_forTest() { + return _drainedSessionsFuture && !_drainedSessionsFuture->isReady(); +} + +} // namespace mongo diff --git a/src/mongo/db/internal_transactions_reap_service.h b/src/mongo/db/internal_transactions_reap_service.h new file mode 100644 index 00000000000..75fd5aba30b --- /dev/null +++ b/src/mongo/db/internal_transactions_reap_service.h @@ -0,0 +1,111 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/operation_context.h" +#include "mongo/db/repl/replica_set_aware_service.h" +#include "mongo/util/concurrency/thread_pool.h" + +namespace mongo { + +/** + * Service responsible for removing completed internal transactions persisted metadata when they can + * no longer be used. Only enabled on replica set primary nodes. + */ +class InternalTransactionsReapService + : public ReplicaSetAwareService { +public: + InternalTransactionsReapService(); + + static InternalTransactionsReapService* get(ServiceContext* serviceContext); + static InternalTransactionsReapService* get(OperationContext* opCtx); + + /** + * Adds the eagerly reaped sessions to the reap service to be buffered and later reaped. Does + * not verify they have actually expired, so callers must guarantee they are safe to remove. + */ + static void onEagerlyReapedSessions(ServiceContext* service, + std::vector lsidsToRemove); + + /** + * Buffers the sessions to be reaped when the max batch size has been reached, upon which a task + * is scheduled to reap the transactions. + */ + void addEagerlyReapedSessions(ServiceContext* service, + std::vector lsidsToRemove); + + /** + * Used in tests to wait for a kicked off drain to complete. + */ + void waitForCurrentDrain_forTest(); + + /** + * Used in tests to detect when the drain task is running. + */ + bool hasCurrentDrain_forTest(); + +private: + /** + * Will remove all currently buffered sessions ids from config.transactions and + * config.image_collection. + */ + void _reapInternalTransactions(ServiceContext* service); + + void onStartup(OperationContext* opCtx) override final; + void onShutdown() override final; + + void onStepUpComplete(OperationContext* opCtx, long long term) override final { + stdx::lock_guard lg(_mutex); + _enabled = true; + } + + void onStepDown() override final { + stdx::lock_guard lg(_mutex); + _enabled = false; + _lsidsToEagerlyReap.clear(); + } + + void onInitialDataAvailable(OperationContext* opCtx, + bool isMajorityDataAvailable) override final {} + void onStepUpBegin(OperationContext* opCtx, long long term) override final {} + void onBecomeArbiter() override final {} + + std::shared_ptr _threadPool; + + // Protects the state below. + mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), + "InternalTransactionsReapService::_mutex"); + + bool _enabled{false}; + boost::optional> _drainedSessionsFuture; + std::vector _lsidsToEagerlyReap; +}; + +} // namespace mongo diff --git a/src/mongo/db/internal_transactions_reap_service.idl b/src/mongo/db/internal_transactions_reap_service.idl new file mode 100644 index 00000000000..86c8997f3aa --- /dev/null +++ b/src/mongo/db/internal_transactions_reap_service.idl @@ -0,0 +1,45 @@ +# Copyright (C) 2022-present MongoDB, Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the Server Side Public License, version 1, +# as published by MongoDB, Inc. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# Server Side Public License for more details. +# +# You should have received a copy of the Server Side Public License +# along with this program. If not, see +# . +# +# As a special exception, the copyright holders give permission to link the +# code of portions of this program with the OpenSSL library under certain +# conditions as described in each individual source file and distribute +# linked combinations including the program with the OpenSSL library. You +# must comply with the Server Side Public License in all respects for +# all of the code used other than as permitted herein. If you modify file(s) +# with this exception, you may extend this exception to your version of the +# file(s), but you are not obligated to do so. If you do not wish to do so, +# delete this exception statement from your version. If you delete this +# exception statement from all source files in the program, then also delete +# it in the license file. +# + +# This IDL file describes the BSON format for a LogicalSessionId, and +# handles the serialization to and deserialization from its BSON representation +# for that class. + +global: + cpp_namespace: "mongo" + +server_parameters: + internalSessionsReapThreshold: + description: 'The minimum number of expired internal sessions that must be buffered in memory + before they are early reaped. 0 disables early reaping.' + set_at: [startup, runtime] + cpp_vartype: 'AtomicWord' + cpp_varname: internalSessionsReapThreshold + default: 1000 + validator: + gte: 0 diff --git a/src/mongo/db/internal_transactions_reap_service_test.cpp b/src/mongo/db/internal_transactions_reap_service_test.cpp new file mode 100644 index 00000000000..0e9d956b627 --- /dev/null +++ b/src/mongo/db/internal_transactions_reap_service_test.cpp @@ -0,0 +1,385 @@ +/** + * Copyright (C) 2022-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/internal_transactions_reap_service.h" +#include "mongo/db/internal_transactions_reap_service_gen.h" +#include "mongo/db/repl/image_collection_entry_gen.h" +#include "mongo/db/repl/replica_set_aware_service.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/repl/storage_interface_impl.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session_catalog_mongod.h" +#include "mongo/db/session_txn_record_gen.h" + +namespace mongo { +namespace { + +class InternalTransactionsReapServiceTest : public ServiceContextMongoDTest { +protected: + void setUp() override { + ServiceContextMongoDTest::setUp(); + + const auto service = getServiceContext(); + _opCtxHolder = makeOperationContext(); + const auto opCtx = _opCtxHolder.get(); + + repl::StorageInterface::set(service, std::make_unique()); + repl::ReplicationCoordinator::set( + service, + std::make_unique(service, createReplSettings())); + repl::createOplog(opCtx); + + ReplicaSetAwareServiceRegistry::get(service).onStartup(opCtx); + + auto replCoord = repl::ReplicationCoordinator::get(opCtx); + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + MongoDSessionCatalog::onStepUp(opCtx); + + // Lower the reap threshold for faster tests. + _originalInternalSessionsReapThreshold = internalSessionsReapThreshold.load(); + internalSessionsReapThreshold.store(50); + + onStepUp(); + } + + void tearDown() override { + internalSessionsReapThreshold.store(_originalInternalSessionsReapThreshold); + _opCtxHolder.reset(); + ReplicaSetAwareServiceRegistry::get(getServiceContext()).onShutdown(); + ServiceContextMongoDTest::tearDown(); + } + + InternalTransactionsReapService* reapService() { + return InternalTransactionsReapService::get(getServiceContext()); + } + + OperationContext* opCtx() const { + return _opCtxHolder.get(); + } + + void onStepUp() { + // Set a last applied opTime to satisfy an invariant in the primary only service registry. + auto newLastAppliedOpTime = repl::OpTime(Timestamp(1, 1), 1); + repl::ReplicationCoordinator::get(getServiceContext()) + ->setMyLastAppliedOpTimeAndWallTime({newLastAppliedOpTime, Date_t()}); + ReplicaSetAwareServiceRegistry::get(getServiceContext()).onStepUpComplete(opCtx(), 1LL); + } + + void onStepDown() { + ReplicaSetAwareServiceRegistry::get(getServiceContext()).onStepDown(); + } + + // Inserts entries for the given sessions into config.transactions and config.image_collection. + void insertSessionDocuments(const std::vector& lsids, + bool skipImageEntry = false, + bool skipSessionEntry = false) { + DBDirectClient client(opCtx()); + + if (!skipImageEntry) { + write_ops::checkWriteErrors(client.insert([&] { + write_ops::InsertCommandRequest imageInsertOp( + NamespaceString::kConfigImagesNamespace); + imageInsertOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase base; + base.setOrdered(false); + return base; + }()); + imageInsertOp.setDocuments([&] { + std::vector docs; + for (const auto& lsid : lsids) { + repl::ImageEntry entry( + lsid, 0, Timestamp(1, 1), repl::RetryImageEnum::kPreImage, BSONObj()); + docs.emplace_back(entry.toBSON()); + } + return docs; + }()); + return imageInsertOp; + }())); + } + + if (!skipSessionEntry) { + auto sessionDeleteReply = write_ops::checkWriteErrors(client.insert([&] { + write_ops::InsertCommandRequest sessionInsertOp( + NamespaceString::kSessionTransactionsTableNamespace); + sessionInsertOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase base; + base.setOrdered(false); + return base; + }()); + sessionInsertOp.setDocuments([&] { + std::vector docs; + for (const auto& lsid : lsids) { + SessionTxnRecord record(lsid, 0, repl::OpTime(), Date_t()); + docs.emplace_back(record.toBSON()); + } + return docs; + }()); + return sessionInsertOp; + }())); + } + } + + void assertInSessionsCollection(const std::vector& lsids) { + DBDirectClient client(opCtx()); + for (const auto& lsid : lsids) { + auto numImageEntries = + client.count(NamespaceString::kConfigImagesNamespace, + BSON(repl::ImageEntry::k_idFieldName << lsid.toBSON())); + ASSERT_EQ(1, numImageEntries) << lsid.toBSON(); + auto numSessionEntries = + client.count(NamespaceString::kSessionTransactionsTableNamespace, + BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())); + ASSERT_EQ(1, numSessionEntries) << lsid.toBSON(); + } + } + + void assertNotInSessionsCollection(const std::vector& lsids) { + DBDirectClient client(opCtx()); + for (const auto& lsid : lsids) { + auto numImageEntries = + client.count(NamespaceString::kConfigImagesNamespace, + BSON(repl::ImageEntry::k_idFieldName << lsid.toBSON())); + ASSERT_EQ(0, numImageEntries) << lsid.toBSON(); + auto numSessionEntries = + client.count(NamespaceString::kSessionTransactionsTableNamespace, + BSON(SessionTxnRecord::kSessionIdFieldName << lsid.toBSON())); + ASSERT_EQ(0, numSessionEntries) << lsid.toBSON(); + } + } + + void assertNoPersistedSessions() { + DBDirectClient client(opCtx()); + auto numImageEntries = client.count(NamespaceString::kConfigImagesNamespace, BSONObj()); + ASSERT_EQ(0, numImageEntries); + auto numSessionEntries = + client.count(NamespaceString::kSessionTransactionsTableNamespace, BSONObj()); + ASSERT_EQ(0, numSessionEntries); + } + + std::vector generateLsids(int num) { + std::vector lsids; + for (int i = 0; i < num; i++) { + lsids.push_back(makeLogicalSessionIdForTest()); + } + return lsids; + } + +private: + repl::ReplSettings createReplSettings() { + repl::ReplSettings settings; + settings.setOplogSizeBytes(5 * 1024 * 1024); + settings.setReplSetString("mySet/node1:12345"); + return settings; + } + + int _originalInternalSessionsReapThreshold; + ServiceContext::UniqueOperationContext _opCtxHolder; +}; + +TEST_F(InternalTransactionsReapServiceTest, DoesNotReapUntilThreshold) { + auto lsidsBelowThreshold = generateLsids(internalSessionsReapThreshold.load() - 1); + insertSessionDocuments(lsidsBelowThreshold); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsBelowThreshold); + assertInSessionsCollection(lsidsBelowThreshold); + + auto lsidToReachThreshold = generateLsids(1); + insertSessionDocuments(lsidToReachThreshold); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidToReachThreshold); + reapService()->waitForCurrentDrain_forTest(); + assertNoPersistedSessions(); +} + +TEST_F(InternalTransactionsReapServiceTest, ExceedingThresholdTriggersReap) { + auto lsidsBelowThreshold = generateLsids(internalSessionsReapThreshold.load() - 1); + insertSessionDocuments(lsidsBelowThreshold); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsBelowThreshold); + assertInSessionsCollection(lsidsBelowThreshold); + + // Passing the threshold by a large amount is fine and will trigger removing all accumulated + // sessions, not just the threshold amount. + auto lsidsToExceedThreshold = generateLsids(internalSessionsReapThreshold.load() * 2); + insertSessionDocuments(lsidsToExceedThreshold); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsToExceedThreshold); + reapService()->waitForCurrentDrain_forTest(); + assertNoPersistedSessions(); +} + +TEST_F(InternalTransactionsReapServiceTest, CanReapMultipleRounds) { + auto reap = [&] { + auto lsidsBelowThreshold = generateLsids(internalSessionsReapThreshold.load() - 1); + insertSessionDocuments(lsidsBelowThreshold); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsBelowThreshold); + assertInSessionsCollection(lsidsBelowThreshold); + + auto lsidToReachThreshold = generateLsids(1); + insertSessionDocuments(lsidToReachThreshold); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidToReachThreshold); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidToReachThreshold); + reapService()->waitForCurrentDrain_forTest(); + assertNoPersistedSessions(); + }; + + // Reaps can be triggered multiple times. + reap(); + reap(); + reap(); +} + +TEST_F(InternalTransactionsReapServiceTest, CanReapMoreThanMaxSessionDeletionBatchSize) { + auto lsidsOverBatchSize = + generateLsids(MongoDSessionCatalog::kMaxSessionDeletionBatchSize + 11); + insertSessionDocuments(lsidsOverBatchSize); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsOverBatchSize); + reapService()->waitForCurrentDrain_forTest(); + assertNoPersistedSessions(); +} + +TEST_F(InternalTransactionsReapServiceTest, OnlySchedulesOneReapAtATime) { + auto pauseReaperThreadFp = + globalFailPointRegistry().find("pauseInternalTransactionsReaperAfterSwap"); + auto timesEnteredFailPoint = pauseReaperThreadFp->setMode(FailPoint::alwaysOn); + + auto lsidsForInitialReap = generateLsids(internalSessionsReapThreshold.load()); + insertSessionDocuments(lsidsForInitialReap); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsForInitialReap); + ASSERT(reapService()->hasCurrentDrain_forTest()); + assertInSessionsCollection(lsidsForInitialReap); + + // Wait for fail point. + pauseReaperThreadFp->waitForTimesEntered(timesEnteredFailPoint + 1); + + auto lsidsDuringReap = generateLsids(internalSessionsReapThreshold.load()); + insertSessionDocuments(lsidsDuringReap); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsDuringReap); + ASSERT(reapService()->hasCurrentDrain_forTest()); + assertInSessionsCollection(lsidsDuringReap); + + // Disable fail point and verify the concurrent sessions weren't reaped. + pauseReaperThreadFp->setMode(FailPoint::off); + reapService()->waitForCurrentDrain_forTest(); + assertNotInSessionsCollection(lsidsForInitialReap); + assertInSessionsCollection(lsidsDuringReap); + + // The concurrently added sessions will be reaped in the next round. + reapService()->addEagerlyReapedSessions(getServiceContext(), generateLsids(1)); + reapService()->waitForCurrentDrain_forTest(); + assertNoPersistedSessions(); +}; + +TEST_F(InternalTransactionsReapServiceTest, DoesNotErrorIfReceivedSameSessionTwice) { + auto lsids = generateLsids(2); + insertSessionDocuments(lsids); + + reapService()->addEagerlyReapedSessions(getServiceContext(), lsids); + assertInSessionsCollection(lsids); + + reapService()->addEagerlyReapedSessions(getServiceContext(), lsids); + assertInSessionsCollection(lsids); + + auto lsidsToReachThreshold = generateLsids(internalSessionsReapThreshold.load()); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsToReachThreshold); + reapService()->waitForCurrentDrain_forTest(); + assertNoPersistedSessions(); +} + +TEST_F(InternalTransactionsReapServiceTest, DoesNotErrorIfSessionOrImageIsNotOnDisk) { + auto lsidsNoImage = generateLsids(2); + insertSessionDocuments(lsidsNoImage, true /* skipImageEntry */); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsNoImage); + + auto lsidsNoSession = generateLsids(2); + insertSessionDocuments(lsidsNoSession, false /* skipImageEntry */, true /* skipSessionEntry */); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsNoSession); + + auto normalLsids = generateLsids(3); + insertSessionDocuments(normalLsids); + reapService()->addEagerlyReapedSessions(getServiceContext(), normalLsids); + + auto lsidsNoImageOrSession = generateLsids(internalSessionsReapThreshold.load()); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsNoImageOrSession); + + reapService()->waitForCurrentDrain_forTest(); + assertNoPersistedSessions(); +} + +TEST_F(InternalTransactionsReapServiceTest, DoesNotReapAsSecondaryAndClearsSessionsOnStepdown) { + auto lsidsAsPrimary = generateLsids(2); + insertSessionDocuments(lsidsAsPrimary); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsAsPrimary); + assertInSessionsCollection(lsidsAsPrimary); + + onStepDown(); + + auto lsidsAsSecondary = generateLsids(internalSessionsReapThreshold.load()); + insertSessionDocuments(lsidsAsSecondary); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsAsSecondary); + ASSERT_FALSE(reapService()->hasCurrentDrain_forTest()); + assertInSessionsCollection(lsidsAsPrimary); + assertInSessionsCollection(lsidsAsSecondary); + + onStepUp(); + + // Despite having seen more than the threshold as a secondary and previous primary, there should + // have been no sessions buffered so this will not trigger a refresh. + auto newLsidAsPrimary = generateLsids(1); + insertSessionDocuments(newLsidAsPrimary); + reapService()->addEagerlyReapedSessions(getServiceContext(), newLsidAsPrimary); + ASSERT_FALSE(reapService()->hasCurrentDrain_forTest()); + assertInSessionsCollection(lsidsAsPrimary); + assertInSessionsCollection(lsidsAsSecondary); + assertInSessionsCollection(newLsidAsPrimary); + + auto lsidsToReachThreshold = generateLsids(internalSessionsReapThreshold.load()); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsToReachThreshold); + reapService()->waitForCurrentDrain_forTest(); + + // The previous sessions were missed and will remain until the normal logical session reaper + // logic removes them. + assertInSessionsCollection(lsidsAsPrimary); + assertInSessionsCollection(lsidsAsSecondary); + + // But newly added sessions will be reaped. + assertNotInSessionsCollection(newLsidAsPrimary); + assertNotInSessionsCollection(lsidsToReachThreshold); +} + +TEST_F(InternalTransactionsReapServiceTest, ThresholdOfZeroDisablesReaping) { + // tearDown() will restore the original value already, so there's no need to do it in the test. + internalSessionsReapThreshold.store(0); + + auto lsidsOverZeroThreshold = generateLsids(10); + insertSessionDocuments(lsidsOverZeroThreshold); + reapService()->addEagerlyReapedSessions(getServiceContext(), lsidsOverZeroThreshold); + ASSERT_FALSE(reapService()->hasCurrentDrain_forTest()); + assertInSessionsCollection(lsidsOverZeroThreshold); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/db/mongod_main.cpp b/src/mongo/db/mongod_main.cpp index a3a56f705f6..52a9c4651d6 100644 --- a/src/mongo/db/mongod_main.cpp +++ b/src/mongo/db/mongod_main.cpp @@ -91,6 +91,7 @@ #include "mongo/db/index_names.h" #include "mongo/db/initialize_server_global_state.h" #include "mongo/db/initialize_snmp.h" +#include "mongo/db/internal_transactions_reap_service.h" #include "mongo/db/introspect.h" #include "mongo/db/json.h" #include "mongo/db/keys_collection_client_direct.h" @@ -158,6 +159,7 @@ #include "mongo/db/serverless/shard_split_donor_service.h" #include "mongo/db/service_context.h" #include "mongo/db/service_entry_point_mongod.h" +#include "mongo/db/session_catalog.h" #include "mongo/db/session_killer.h" #include "mongo/db/startup_recovery.h" #include "mongo/db/startup_warnings_mongod.h" @@ -1514,6 +1516,8 @@ int mongod_main(int argc, char* argv[]) { setUpReplication(service); setUpObservers(service); service->setServiceEntryPoint(std::make_unique(service)); + SessionCatalog::get(service)->setOnEagerlyReapedSessionsFn( + InternalTransactionsReapService::onEagerlyReapedSessions); ErrorExtraInfo::invariantHaveAllParsers(); diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 68ff1654ed1..37a093576fa 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -309,7 +309,7 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, Session* session, boost::optional killToken, boost::optional clientTxnNumberStarted) { - stdx::lock_guard lg(_mutex); + stdx::unique_lock ul(_mutex); // Make sure we have exactly the same session on the map and that it is still associated with an // operation context (meaning checked-out) @@ -319,6 +319,8 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, dassert(killToken->lsidToKill == session->getSessionId()); } + ServiceContext* service = sri->checkoutOpCtx->getServiceContext(); + sri->checkoutOpCtx = nullptr; sri->availableCondVar.notify_all(); @@ -327,16 +329,22 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, --sri->killsRequested; } + std::vector eagerlyReapedSessions; if (clientTxnNumberStarted.has_value()) { // Since the given txnNumber successfully started, we know any child sessions with older // txnNumbers can be discarded. This needed to wait until a transaction started because that // can fail, e.g. if the active transaction is prepared. auto numReaped = stdx::erase_if(sri->childSessions, [&](auto&& it) { - ObservableSession osession(lg, sri, &it.second); + ObservableSession osession(ul, sri, &it.second); if (it.first.getTxnNumber() && *it.first.getTxnNumber() < *clientTxnNumberStarted) { osession.markForReap(ObservableSession::ReapMode::kExclusive); } - return osession._shouldBeReaped(); + + bool willReap = osession._shouldBeReaped(); + if (willReap) { + eagerlyReapedSessions.push_back(std::move(it.first)); + } + return willReap; }); LOGV2_DEBUG(6685200, @@ -347,6 +355,13 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri, "childSessionsRemaining"_attr = sri->childSessions.size(), "numReaped"_attr = numReaped); } + + invariant(ul); + ul.unlock(); + + if (eagerlyReapedSessions.size() && _onEagerlyReapedSessionsFn) { + (*_onEagerlyReapedSessionsFn)(service, std::move(eagerlyReapedSessions)); + } } Session* SessionCatalog::SessionRuntimeInfo::getSession(WithLock, const LogicalSessionId& lsid) { diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index b049550efd9..6201d6fd72b 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -60,6 +60,9 @@ class SessionCatalog { friend class OperationContextSession; public: + using OnEagerlyReapedSessionsFn = + unique_function)>; + class ScopedCheckedOutSession; class SessionToKill; @@ -138,6 +141,15 @@ public: */ size_t size() const; + /** + * Registers a callback to run when sessions are "eagerly" reaped from the catalog, ie without + * waiting for a logical session cache refresh. + */ + void setOnEagerlyReapedSessionsFn(OnEagerlyReapedSessionsFn fn) { + invariant(!_onEagerlyReapedSessionsFn); + _onEagerlyReapedSessionsFn = std::move(fn); + } + private: /** * Tracks the runtime info for transaction sessions that corresponds to the same logical @@ -210,6 +222,11 @@ private: boost::optional killToken, boost::optional clientTxnNumberStarted); + // Called when sessions are reaped from memory "eagerly" ie directly by the SessionCatalog + // without waiting for a logical session cache refresh. Note this is set at process startup + // before multi-threading is enabled, so no synchronization is necessary. + boost::optional _onEagerlyReapedSessionsFn; + // Protects the state below mutable Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(4), "SessionCatalog::_mutex"); diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index eeb1ffb175a..fcefa1ed69b 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -208,12 +208,70 @@ const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1); const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1); const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName; +template +int removeSessionsTransactionRecordsFromDisk(OperationContext* opCtx, + const SessionContainer& transactionSessionIdsToReap) { + if (transactionSessionIdsToReap.empty()) { + return 0; + } + + // Remove the config.image_collection entries for the expired transaction session ids. We first + // delete any images belonging to sessions about to be reaped, followed by the sessions. This + // way if there's a failure, we'll only be left with sessions that have a dangling reference + // to an image. Session reaping will rediscover the sessions to delete and try again. + // + // We opt for this rather than performing the two sets of deletes in a single transaction simply + // to reduce code complexity. + DBDirectClient client(opCtx); + write_ops::checkWriteErrors(client.remove([&] { + write_ops::DeleteCommandRequest imageDeleteOp(NamespaceString::kConfigImagesNamespace); + imageDeleteOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase base; + base.setOrdered(false); + return base; + }()); + imageDeleteOp.setDeletes([&] { + std::vector entries; + for (const auto& transactionSessionId : transactionSessionIdsToReap) { + entries.emplace_back( + BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()), + false /* multi = false */); + } + return entries; + }()); + return imageDeleteOp; + }())); + + // Remove the config.transaction entries for the expired transaction session ids. + auto sessionDeleteReply = write_ops::checkWriteErrors(client.remove([&] { + write_ops::DeleteCommandRequest sessionDeleteOp( + NamespaceString::kSessionTransactionsTableNamespace); + sessionDeleteOp.setWriteCommandRequestBase([] { + write_ops::WriteCommandRequestBase base; + base.setOrdered(false); + return base; + }()); + sessionDeleteOp.setDeletes([&] { + std::vector entries; + for (const auto& transactionSessionId : transactionSessionIdsToReap) { + entries.emplace_back( + BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()), + false /* multi = false */); + } + return entries; + }()); + return sessionDeleteOp; + }())); + + return sessionDeleteReply.getN(); +} + /** * Removes the the config.transactions and the config.image_collection entries for the transaction * sessions in 'expiredTransactionSessionIdsNotInUse' that are safe to reap. Returns the number * of transaction sessions whose entries were removed. */ -int removeSessionsTransactionRecords( +int removeSessionsTransactionRecordsIfExpired( OperationContext* opCtx, SessionsCollection& sessionsCollection, const LogicalSessionIdSet& expiredTransactionSessionIdsNotInUse) { @@ -267,59 +325,7 @@ int removeSessionsTransactionRecords( } } - if (transactionSessionIdsToReap.empty()) { - return 0; - } - - // Remove the config.image_collection entries for the expired transaction session ids. We first - // delete any images belonging to sessions about to be reaped, followed by the sessions. This - // way if there's a failure, we'll only be left with sessions that have a dangling reference - // to an image. Session reaping will rediscover the sessions to delete and try again. - // - // We opt for this rather than performing the two sets of deletes in a single transaction simply - // to reduce code complexity. - DBDirectClient client(opCtx); - write_ops::checkWriteErrors(client.remove([&] { - write_ops::DeleteCommandRequest imageDeleteOp(NamespaceString::kConfigImagesNamespace); - imageDeleteOp.setWriteCommandRequestBase([] { - write_ops::WriteCommandRequestBase base; - base.setOrdered(false); - return base; - }()); - imageDeleteOp.setDeletes([&] { - std::vector entries; - for (const auto& transactionSessionId : transactionSessionIdsToReap) { - entries.emplace_back( - BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()), - false /* multi = false */); - } - return entries; - }()); - return imageDeleteOp; - }())); - - // Remove the config.transaction entries for the expired transaction session ids. - auto sessionDeleteReply = write_ops::checkWriteErrors(client.remove([&] { - write_ops::DeleteCommandRequest sessionDeleteOp( - NamespaceString::kSessionTransactionsTableNamespace); - sessionDeleteOp.setWriteCommandRequestBase([] { - write_ops::WriteCommandRequestBase base; - base.setOrdered(false); - return base; - }()); - sessionDeleteOp.setDeletes([&] { - std::vector entries; - for (const auto& transactionSessionId : transactionSessionIdsToReap) { - entries.emplace_back( - BSON(LogicalSessionRecord::kIdFieldName << transactionSessionId.toBSON()), - false /* multi = false */); - } - return entries; - }()); - return sessionDeleteOp; - }())); - - return sessionDeleteReply.getN(); + return removeSessionsTransactionRecordsFromDisk(opCtx, transactionSessionIdsToReap); } /** @@ -340,10 +346,6 @@ int removeExpiredTransactionSessionsFromDisk( findRequest.setProjection(kIdProjection); auto cursor = client.find(std::move(findRequest)); - // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object size - // limit. - const int kMaxBatchSize = 10'000; - LogicalSessionIdSet expiredTransactionSessionIdsNotInUse; int numReaped = 0; while (cursor->more()) { @@ -357,13 +359,14 @@ int removeExpiredTransactionSessionsFromDisk( } expiredTransactionSessionIdsNotInUse.insert(transactionSessionId); - if (expiredTransactionSessionIdsNotInUse.size() > kMaxBatchSize) { - numReaped += removeSessionsTransactionRecords( + if (expiredTransactionSessionIdsNotInUse.size() > + MongoDSessionCatalog::kMaxSessionDeletionBatchSize) { + numReaped += removeSessionsTransactionRecordsIfExpired( opCtx, sessionsCollection, expiredTransactionSessionIdsNotInUse); expiredTransactionSessionIdsNotInUse.clear(); } } - numReaped += removeSessionsTransactionRecords( + numReaped += removeSessionsTransactionRecordsIfExpired( opCtx, sessionsCollection, expiredTransactionSessionIdsNotInUse); return numReaped; @@ -621,6 +624,22 @@ int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx, opCtx, sessionsCollection, possiblyExpired, expiredTransactionSessionIdsStillInUse); } +int MongoDSessionCatalog::removeSessionsTransactionRecords( + OperationContext* opCtx, const std::vector& transactionSessionIdsToRemove) { + std::vector nextLsidBatch; + int numReaped = 0; + for (const auto& transactionSessionIdToRemove : transactionSessionIdsToRemove) { + nextLsidBatch.push_back(transactionSessionIdToRemove); + if (nextLsidBatch.size() > MongoDSessionCatalog::kMaxSessionDeletionBatchSize) { + numReaped += removeSessionsTransactionRecordsFromDisk(opCtx, nextLsidBatch); + nextLsidBatch.clear(); + } + } + numReaped += removeSessionsTransactionRecordsFromDisk(opCtx, nextLsidBatch); + + return numReaped; +} + MongoDOperationContextSession::MongoDOperationContextSession(OperationContext* opCtx) : _operationContextSession(opCtx) { invariant(!opCtx->getClient()->isInDirectClient()); diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h index eb8402aece4..5c092e7e005 100644 --- a/src/mongo/db/session_catalog_mongod.h +++ b/src/mongo/db/session_catalog_mongod.h @@ -39,6 +39,10 @@ class MongoDSessionCatalog { public: static const std::string kConfigTxnsPartialIndexName; + // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object size + // limit. + static const int kMaxSessionDeletionBatchSize = 10'000; + /** * Returns the specification for the partial index on config.transactions used to support * retryable transactions. @@ -83,6 +87,12 @@ public: static int reapSessionsOlderThan(OperationContext* opCtx, SessionsCollection& sessionsCollection, Date_t possiblyExpired); + + /** + * Deletes the given session ids from config.transactions and config.image_collection. + */ + static int removeSessionsTransactionRecords(OperationContext* opCtx, + const std::vector& lsidsToRemove); }; /** -- cgit v1.2.1