diff options
Diffstat (limited to 'src/mongo')
25 files changed, 699 insertions, 74 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 4128794d1c1..e63c3b20742 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1050,6 +1050,7 @@ env.Library( ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/s/query/cluster_query', + 'sessions_collection_rs', ], ) @@ -1082,6 +1083,23 @@ env.Library( ], ) +env.Library( + target='transaction_reaper', + source=[ + 'transaction_reaper.cpp', + ], + LIBDEPS_PRIVATE=[ + '$BUILD_DIR/mongo/db/commands/server_status', + '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', + '$BUILD_DIR/mongo/s/client/shard_interface', + '$BUILD_DIR/mongo/s/coreshard', + 'dbdirectclient', + 'logical_session_id', + 'sessions_collection', + 'write_ops', + ], +) + envWithAsio.CppUnitTest( target='logical_session_cache_test', source=[ @@ -1115,6 +1133,9 @@ envWithAsio.Library( 'sessions_collection_sharded', 'sessions_collection_standalone', ], + LIBDEPS_PRIVATE=[ + 'transaction_reaper', + ], ) envWithAsio.Library( diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 3efcaa19f17..99cb82a1298 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -68,6 +68,7 @@ env.Library( "kill_sessions_command.cpp", "mr_common.cpp", "parameters.cpp", + "reap_logical_session_cache_now.cpp", "refresh_logical_session_cache_now.cpp", "refresh_sessions_command.cpp", "refresh_sessions_command_internal.cpp", diff --git a/src/mongo/db/commands/reap_logical_session_cache_now.cpp b/src/mongo/db/commands/reap_logical_session_cache_now.cpp new file mode 100644 index 00000000000..a1817d4f24e --- /dev/null +++ b/src/mongo/db/commands/reap_logical_session_cache_now.cpp @@ -0,0 +1,96 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/base/init.h" +#include "mongo/db/client.h" +#include "mongo/db/commands.h" +#include "mongo/db/logical_session_cache.h" +#include "mongo/db/operation_context.h" + +namespace mongo { + +namespace { + +class ReapLogicalSessionCacheNowCommand final : public BasicCommand { + MONGO_DISALLOW_COPYING(ReapLogicalSessionCacheNowCommand); + +public: + ReapLogicalSessionCacheNowCommand() : BasicCommand("reapLogicalSessionCacheNow") {} + + bool slaveOk() const override { + return true; + } + + bool adminOnly() const override { + return false; + } + + bool supportsWriteConcern(const BSONObj& cmd) const override { + return false; + } + + void help(std::stringstream& help) const override { + help << "force the logical session cache to reap. Test command only."; + } + + // No auth needed because it only works when enabled via command line. + Status checkAuthForOperation(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj) override { + return Status::OK(); + } + + virtual bool run(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + auto cache = LogicalSessionCache::get(opCtx); + auto client = opCtx->getClient(); + + auto res = cache->reapNow(client); + if (!res.isOK()) { + return appendCommandStatus(result, res); + } + + return true; + } +}; + +MONGO_INITIALIZER(RegisterReapLogicalSessionCacheNowCommand)(InitializerContext* context) { + if (Command::testCommandsEnabled) { + // Leaked intentionally: a Command registers itself when constructed. + new ReapLogicalSessionCacheNowCommand(); + } + return Status::OK(); +} + +} // namespace + +} // namespace mongo diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 85ee094c037..7bef30a8593 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -776,7 +776,7 @@ ExitCode _initAndListen(int listenPort) { kind = LogicalSessionCacheServer::kReplicaSet; } - auto sessionCache = makeLogicalSessionCacheD(kind); + auto sessionCache = makeLogicalSessionCacheD(globalServiceContext, kind); LogicalSessionCache::set(globalServiceContext, std::move(sessionCache)); // MessageServer::run will return when exit code closes its socket and we don't need the diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h index 8e7ba166874..3a1ce97ef39 100644 --- a/src/mongo/db/logical_session_cache.h +++ b/src/mongo/db/logical_session_cache.h @@ -97,6 +97,11 @@ public: virtual Status refreshNow(Client* client) = 0; /** + * Reaps transaction records synchronously. + */ + virtual Status reapNow(Client* client) = 0; + + /** * Returns the current time. */ virtual Date_t now() = 0; diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp index 7f958e547a7..0fdc1135d56 100644 --- a/src/mongo/db/logical_session_cache_factory_mongod.cpp +++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp @@ -26,6 +26,8 @@ * it in the license file. */ +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + #include "mongo/platform/basic.h" #include <memory> @@ -37,20 +39,22 @@ #include "mongo/db/sessions_collection_rs.h" #include "mongo/db/sessions_collection_sharded.h" #include "mongo/db/sessions_collection_standalone.h" +#include "mongo/db/transaction_reaper.h" #include "mongo/stdx/memory.h" +#include "mongo/util/log.h" namespace mongo { namespace { -std::unique_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheServer state) { +std::shared_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheServer state) { switch (state) { case LogicalSessionCacheServer::kSharded: - return stdx::make_unique<SessionsCollectionSharded>(); + return std::make_shared<SessionsCollectionSharded>(); case LogicalSessionCacheServer::kReplicaSet: - return stdx::make_unique<SessionsCollectionRS>(); + return std::make_shared<SessionsCollectionRS>(); case LogicalSessionCacheServer::kStandalone: - return stdx::make_unique<SessionsCollectionStandalone>(); + return std::make_shared<SessionsCollectionStandalone>(); default: MONGO_UNREACHABLE; } @@ -58,13 +62,28 @@ std::unique_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheSe } // namespace -std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state) { +std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(ServiceContext* svc, + LogicalSessionCacheServer state) { auto liason = stdx::make_unique<ServiceLiasonMongod>(); // Set up the logical session cache auto sessionsColl = makeSessionsCollection(state); - return stdx::make_unique<LogicalSessionCacheImpl>( - std::move(liason), std::move(sessionsColl), LogicalSessionCacheImpl::Options{}); + + auto reaper = [&]() -> std::shared_ptr<TransactionReaper> { + switch (state) { + case LogicalSessionCacheServer::kSharded: + return TransactionReaper::make(TransactionReaper::Type::kSharded, sessionsColl); + case LogicalSessionCacheServer::kReplicaSet: + return TransactionReaper::make(TransactionReaper::Type::kReplicaSet, sessionsColl); + default: + return nullptr; + } + }(); + + return stdx::make_unique<LogicalSessionCacheImpl>(std::move(liason), + std::move(sessionsColl), + std::move(reaper), + LogicalSessionCacheImpl::Options{}); } } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_factory_mongod.h b/src/mongo/db/logical_session_cache_factory_mongod.h index e69c459d469..b6ac0430fd0 100644 --- a/src/mongo/db/logical_session_cache_factory_mongod.h +++ b/src/mongo/db/logical_session_cache_factory_mongod.h @@ -37,6 +37,9 @@ namespace mongo { enum class LogicalSessionCacheServer { kSharded, kReplicaSet, kStandalone }; -std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state); +class ServiceContext; + +std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(ServiceContext* svc, + LogicalSessionCacheServer state); } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_factory_mongos.cpp b/src/mongo/db/logical_session_cache_factory_mongos.cpp index 6ba42da89de..9822ec00b93 100644 --- a/src/mongo/db/logical_session_cache_factory_mongos.cpp +++ b/src/mongo/db/logical_session_cache_factory_mongos.cpp @@ -45,7 +45,7 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS() { auto sessionsColl = stdx::make_unique<SessionsCollectionSharded>(); return stdx::make_unique<LogicalSessionCacheImpl>( - std::move(liason), std::move(sessionsColl), LogicalSessionCacheImpl::Options{}); + std::move(liason), std::move(sessionsColl), nullptr, LogicalSessionCacheImpl::Options{}); } } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index e84d431083b..ffafcccbd4f 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -58,18 +58,22 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, f constexpr int LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity; constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh; -LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiason> service, - std::unique_ptr<SessionsCollection> collection, - Options options) +LogicalSessionCacheImpl::LogicalSessionCacheImpl( + std::unique_ptr<ServiceLiason> service, + std::shared_ptr<SessionsCollection> collection, + std::shared_ptr<TransactionReaper> transactionReaper, + Options options) : _refreshInterval(options.refreshInterval), _sessionTimeout(options.sessionTimeout), _service(std::move(service)), _sessionsColl(std::move(collection)), + _transactionReaper(std::move(transactionReaper)), _cache(options.capacity) { if (!disableLogicalSessionCacheRefresh) { - PeriodicRunner::PeriodicJob job{[this](Client* client) { _periodicRefresh(client); }, - duration_cast<Milliseconds>(_refreshInterval)}; - _service->scheduleJob(std::move(job)); + _service->scheduleJob( + {[this](Client* client) { _periodicRefresh(client); }, _refreshInterval}); + _service->scheduleJob( + {[this](Client* client) { _periodicReap(client); }, _refreshInterval}); } } @@ -145,6 +149,10 @@ Status LogicalSessionCacheImpl::refreshNow(Client* client) { return _refresh(client); } +Status LogicalSessionCacheImpl::reapNow(Client* client) { + return _reap(client); +} + Date_t LogicalSessionCacheImpl::now() { return _service->now(); } @@ -163,6 +171,39 @@ void LogicalSessionCacheImpl::_periodicRefresh(Client* client) { return; } +void LogicalSessionCacheImpl::_periodicReap(Client* client) { + auto res = _reap(client); + if (!res.isOK()) { + log() << "Failed to reap transaction table: " << res; + } + + return; +} + +Status LogicalSessionCacheImpl::_reap(Client* client) { + if (!_transactionReaper) { + return Status::OK(); + } + + try { + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto* const opCtx = [&client, &uniqueCtx] { + if (client->getOperationContext()) { + return client->getOperationContext(); + } + + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); + stdx::lock_guard<stdx::mutex> lk(_reaperMutex); + _transactionReaper->reap(opCtx); + } catch (...) { + return exceptionToStatus(); + } + + return Status::OK(); +} + Status LogicalSessionCacheImpl::_refresh(Client* client) { LogicalSessionRecordSet activeSessions; LogicalSessionRecordSet deadSessions; diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h index 47a2f897f3b..688e04a8a37 100644 --- a/src/mongo/db/logical_session_cache_impl.h +++ b/src/mongo/db/logical_session_cache_impl.h @@ -34,6 +34,7 @@ #include "mongo/db/service_liason.h" #include "mongo/db/sessions_collection.h" #include "mongo/db/time_proof_service.h" +#include "mongo/db/transaction_reaper.h" #include "mongo/platform/atomic_word.h" #include "mongo/stdx/thread.h" #include "mongo/util/lru_cache.h" @@ -96,7 +97,8 @@ public: * Construct a new session cache. */ explicit LogicalSessionCacheImpl(std::unique_ptr<ServiceLiason> service, - std::unique_ptr<SessionsCollection> collection, + std::shared_ptr<SessionsCollection> collection, + std::shared_ptr<TransactionReaper> transactionReaper, Options options = Options{}); LogicalSessionCacheImpl(const LogicalSessionCacheImpl&) = delete; @@ -119,6 +121,8 @@ public: Status refreshNow(Client* client) override; + Status reapNow(Client* client) override; + Date_t now() override; size_t size() override; @@ -138,6 +142,9 @@ private: void _periodicRefresh(Client* client); Status _refresh(Client* client); + void _periodicReap(Client* client); + Status _reap(Client* client); + /** * Returns true if a record has passed its given expiration. */ @@ -152,7 +159,10 @@ private: const Minutes _sessionTimeout; std::unique_ptr<ServiceLiason> _service; - std::unique_ptr<SessionsCollection> _sessionsColl; + std::shared_ptr<SessionsCollection> _sessionsColl; + + mutable stdx::mutex _reaperMutex; + std::shared_ptr<TransactionReaper> _transactionReaper; mutable stdx::mutex _cacheMutex; LRUCache<LogicalSessionId, LogicalSessionRecord, LogicalSessionIdHash> _cache; diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h index 7c5e0be1784..077efa741b9 100644 --- a/src/mongo/db/logical_session_cache_noop.h +++ b/src/mongo/db/logical_session_cache_noop.h @@ -66,6 +66,10 @@ public: return Status::OK(); } + Status reapNow(Client* client) override { + return Status::OK(); + } + Date_t now() override { return Date_t::now(); } diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp index 6198b61d233..cb8a226dea7 100644 --- a/src/mongo/db/logical_session_cache_test.cpp +++ b/src/mongo/db/logical_session_cache_test.cpp @@ -69,8 +69,8 @@ public: auto mockService = stdx::make_unique<MockServiceLiason>(_service); auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions); - _cache = stdx::make_unique<LogicalSessionCacheImpl>(std::move(mockService), - std::move(mockSessions)); + _cache = stdx::make_unique<LogicalSessionCacheImpl>( + std::move(mockService), std::move(mockSessions), nullptr); } void tearDown() override { diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl index e7bac0f5165..365f06edc0d 100644 --- a/src/mongo/db/logical_session_id.idl +++ b/src/mongo/db/logical_session_id.idl @@ -155,7 +155,7 @@ structs: description: "Parser for forming the fetch request for SessionsCollection::fetch" strict: true fields: - find: namespacestring + find: string filter: SessionsCollectionFetchRequestFilter projection: SessionsCollectionFetchRequestProjection batchSize: int diff --git a/src/mongo/db/logical_session_id_test.cpp b/src/mongo/db/logical_session_id_test.cpp index 7f763c8f625..6d796fb7710 100644 --- a/src/mongo/db/logical_session_id_test.cpp +++ b/src/mongo/db/logical_session_id_test.cpp @@ -100,7 +100,7 @@ public: std::make_shared<MockSessionsCollectionImpl>()); auto localLogicalSessionCache = stdx::make_unique<LogicalSessionCacheImpl>( - std::move(localServiceLiason), std::move(localSessionsCollection)); + std::move(localServiceLiason), std::move(localSessionsCollection), nullptr); LogicalSessionCache::set(&serviceContext, std::move(localLogicalSessionCache)); } diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index d1618901350..b6341eb0423 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -144,14 +144,17 @@ Status runBulkCmd(StringData label, constexpr StringData SessionsCollection::kSessionsDb; constexpr StringData SessionsCollection::kSessionsCollection; constexpr StringData SessionsCollection::kSessionsFullNS; +const NamespaceString SessionsCollection::kSessionsNamespaceString = + NamespaceString{SessionsCollection::kSessionsFullNS}; SessionsCollection::~SessionsCollection() = default; -SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(DBClientBase* client) { - auto send = [client](BSONObj batch) -> Status { +SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite( + const NamespaceString& ns, DBClientBase* client) { + auto send = [client, ns](BSONObj batch) -> Status { BSONObj res; - if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res)) { + if (!client->runCommand(ns.db().toString(), batch, res)) { return getStatusFromCommandResult(res); } @@ -167,10 +170,11 @@ SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(DBCl return send; } -SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClientBase* client) { - auto send = [client](BSONObj cmd) -> Status { +SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(const NamespaceString& ns, + DBClientBase* client) { + auto send = [client, ns](BSONObj cmd) -> Status { BSONObj res; - if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) { + if (!client->runCommand(ns.db().toString(), cmd, res)) { return getStatusFromCommandResult(res); } @@ -180,10 +184,11 @@ SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForCommand(DBClien return send; } -SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(DBClientBase* client) { - auto send = [client](BSONObj cmd) -> StatusWith<BSONObj> { +SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const NamespaceString& ns, + DBClientBase* client) { + auto send = [client, ns](BSONObj cmd) -> StatusWith<BSONObj> { BSONObj res; - if (!client->runCommand(SessionsCollection::kSessionsDb.toString(), cmd, res)) { + if (!client->runCommand(ns.db().toString(), cmd, res)) { return getStatusFromCommandResult(res); } @@ -193,11 +198,12 @@ SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(DBClien return send; } -Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions, +Status SessionsCollection::doRefresh(const NamespaceString& ns, + const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send) { - auto init = [](BSONObjBuilder* batch) { - batch->append("update", kSessionsCollection); + auto init = [ns](BSONObjBuilder* batch) { + batch->append("update", ns.coll()); batch->append("ordered", false); }; @@ -210,7 +216,8 @@ Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions, return runBulkCmd("updates", init, add, send, sessions); } -Status SessionsCollection::doRefreshExternal(const LogicalSessionRecordSet& sessions, +Status SessionsCollection::doRefreshExternal(const NamespaceString& ns, + const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send) { auto makeT = [] { return std::vector<LogicalSessionRecord>{}; }; @@ -227,9 +234,11 @@ Status SessionsCollection::doRefreshExternal(const LogicalSessionRecordSet& sess return runBulkGeneric(makeT, add, sendLocal, sessions); } -Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send) { - auto init = [](BSONObjBuilder* batch) { - batch->append("delete", kSessionsCollection); +Status SessionsCollection::doRemove(const NamespaceString& ns, + const LogicalSessionIdSet& sessions, + SendBatchFn send) { + auto init = [ns](BSONObjBuilder* batch) { + batch->append("delete", ns.coll()); batch->append("ordered", false); }; @@ -240,12 +249,15 @@ Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBat return runBulkCmd("deletes", init, add, send, sessions); } -Status SessionsCollection::doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send) { +Status SessionsCollection::doRemoveExternal(const NamespaceString& ns, + const LogicalSessionIdSet& sessions, + SendBatchFn send) { // TODO SERVER-28335 Implement endSessions, with internal counterpart. return Status::OK(); } -StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const LogicalSessionIdSet& sessions, +StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const NamespaceString& ns, + const LogicalSessionIdSet& sessions, FindBatchFn send) { auto makeT = [] { return std::vector<LogicalSessionId>{}; }; @@ -275,7 +287,7 @@ StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const LogicalSession auto sendLocal = [&](std::vector<LogicalSessionId>& batch) { SessionsCollectionFetchRequest request; - request.setFind(NamespaceString{SessionsCollection::kSessionsCollection}); + request.setFind(ns.coll()); request.setFilter({}); request.getFilter().set_id({}); request.getFilter().get_id().setIn(batch); diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h index 3c9a5d65fb1..e86e29cd67b 100644 --- a/src/mongo/db/sessions_collection.h +++ b/src/mongo/db/sessions_collection.h @@ -53,6 +53,8 @@ public: static constexpr StringData kSessionsCollection = "system.sessions"_sd; static constexpr StringData kSessionsFullNS = "admin.system.sessions"_sd; + static const NamespaceString kSessionsNamespaceString; + /** * Updates the last-use times on the given sessions to be greater than * or equal to the given time. Returns an error if a networking issue occurred. @@ -71,6 +73,9 @@ public: */ virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0; + virtual Status removeTransactionRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) = 0; + /** * Checks a set of lsids and returns the set that no longer exists * @@ -84,29 +89,39 @@ protected: * Makes a send function for the given client. */ using SendBatchFn = stdx::function<Status(BSONObj batch)>; - SendBatchFn makeSendFnForCommand(DBClientBase* client); - SendBatchFn makeSendFnForBatchWrite(DBClientBase* client); + SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client); + SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client); using FindBatchFn = stdx::function<StatusWith<BSONObj>(BSONObj batch)>; - FindBatchFn makeFindFnForCommand(DBClientBase* client); + FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client); /** * Formats and sends batches of refreshes for the given set of sessions. */ - Status doRefresh(const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send); - Status doRefreshExternal(const LogicalSessionRecordSet& sessions, + Status doRefresh(const NamespaceString& ns, + const LogicalSessionRecordSet& sessions, + Date_t refreshTime, + SendBatchFn send); + Status doRefreshExternal(const NamespaceString& ns, + const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send); /** * Formats and sends batches of deletes for the given set of sessions. */ - Status doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send); - Status doRemoveExternal(const LogicalSessionIdSet& sessions, SendBatchFn send); + Status doRemove(const NamespaceString& ns, + const LogicalSessionIdSet& sessions, + SendBatchFn send); + Status doRemoveExternal(const NamespaceString& ns, + const LogicalSessionIdSet& sessions, + SendBatchFn send); /** * Formats and sends batches of fetches for the given set of sessions. */ - StatusWith<LogicalSessionIdSet> doFetch(const LogicalSessionIdSet& sessions, FindBatchFn send); + StatusWith<LogicalSessionIdSet> doFetch(const NamespaceString& ns, + const LogicalSessionIdSet& sessions, + FindBatchFn send); }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h index f88d877b17b..6a88a3af816 100644 --- a/src/mongo/db/sessions_collection_mock.h +++ b/src/mongo/db/sessions_collection_mock.h @@ -116,6 +116,11 @@ public: return LogicalSessionIdSet{}; } + Status removeTransactionRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) override { + return Status::OK(); + } + private: std::shared_ptr<MockSessionsCollectionImpl> _impl; }; diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp index 89387af9531..f4d5f44b453 100644 --- a/src/mongo/db/sessions_collection_rs.cpp +++ b/src/mongo/db/sessions_collection_rs.cpp @@ -84,13 +84,16 @@ Status makePrimaryConnection(OperationContext* opCtx, boost::optional<ScopedDbCo } template <typename Callback> -auto runIfStandaloneOrPrimary(OperationContext* opCtx, Callback callback) +auto runIfStandaloneOrPrimary(const NamespaceString& ns, + LockMode mode, + OperationContext* opCtx, + Callback callback) -> boost::optional<decltype(std::declval<Callback>()())> { - Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IX); - Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IX); + Lock::DBLock lk(opCtx, ns.db(), mode); + Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, mode); auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); - if (coord->canAcceptWritesForDatabase(opCtx, SessionsCollection::kSessionsDb)) { + if (coord->canAcceptWritesForDatabase(opCtx, ns.db())) { return callback(); } @@ -110,10 +113,14 @@ auto sendToPrimary(OperationContext* opCtx, Callback callback) } template <typename LocalCallback, typename RemoteCallback> -auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallback remoteCallback) +auto dispatch(const NamespaceString& ns, + LockMode mode, + OperationContext* opCtx, + LocalCallback localCallback, + RemoteCallback remoteCallback) -> decltype(std::declval<RemoteCallback>()(static_cast<DBClientBase*>(nullptr))) { // If we are the primary, write directly to ourself. - auto result = runIfStandaloneOrPrimary(opCtx, [&] { return localCallback(); }); + auto result = runIfStandaloneOrPrimary(ns, mode, opCtx, [&] { return localCallback(); }); if (result) { return *result; @@ -127,38 +134,83 @@ auto dispatch(OperationContext* opCtx, LocalCallback localCallback, RemoteCallba Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, const LogicalSessionRecordSet& sessions, Date_t refreshTime) { - return dispatch(opCtx, + return dispatch( + kSessionsNamespaceString, + MODE_IX, + opCtx, + [&] { + DBDirectClient client(opCtx); + return doRefresh(kSessionsNamespaceString, + sessions, + refreshTime, + makeSendFnForBatchWrite(kSessionsNamespaceString, &client)); + }, + [&](DBClientBase* client) { + return doRefreshExternal(kSessionsNamespaceString, + sessions, + refreshTime, + makeSendFnForCommand(kSessionsNamespaceString, client)); + }); +} + +Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { + return dispatch(kSessionsNamespaceString, + MODE_IX, + opCtx, [&] { DBDirectClient client(opCtx); - return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client)); + return doRemove(kSessionsNamespaceString, + sessions, + makeSendFnForBatchWrite(kSessionsNamespaceString, &client)); }, [&](DBClientBase* client) { - return doRefreshExternal( - sessions, refreshTime, makeSendFnForCommand(client)); + return doRemoveExternal( + kSessionsNamespaceString, + sessions, + makeSendFnForCommand(kSessionsNamespaceString, client)); }); } -Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { - return dispatch(opCtx, +StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) { + return dispatch(kSessionsNamespaceString, + MODE_IS, + opCtx, [&] { DBDirectClient client(opCtx); - return doRemove(sessions, makeSendFnForBatchWrite(&client)); + return doFetch(kSessionsNamespaceString, + sessions, + makeFindFnForCommand(kSessionsNamespaceString, &client)); }, [&](DBClientBase* client) { - return doRemoveExternal(sessions, makeSendFnForCommand(client)); + return doFetch(kSessionsNamespaceString, + sessions, + makeFindFnForCommand(kSessionsNamespaceString, client)); }); } -StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions( - OperationContext* opCtx, const LogicalSessionIdSet& sessions) { +Status SessionsCollectionRS::removeTransactionRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { return dispatch( + kSessionsNamespaceString, + MODE_IX, opCtx, [&] { DBDirectClient client(opCtx); - return doFetch(sessions, makeFindFnForCommand(&client)); + return doRemove(NamespaceString::kSessionTransactionsTableNamespace, + sessions, + makeSendFnForBatchWrite( + NamespaceString::kSessionTransactionsTableNamespace, &client)); }, - [&](DBClientBase* client) { return doFetch(sessions, makeFindFnForCommand(client)); }); + [](DBClientBase*) { + return Status(ErrorCodes::NotMaster, "Not eligible to remove transaction records"); + }); +} + +Status SessionsCollectionRS::removeTransactionRecordsHelper(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { + return SessionsCollectionRS{}.removeTransactionRecords(opCtx, sessions); } } // namespace mongo diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h index a273dd88955..cc49ecf511b 100644 --- a/src/mongo/db/sessions_collection_rs.h +++ b/src/mongo/db/sessions_collection_rs.h @@ -68,6 +68,12 @@ public: StatusWith<LogicalSessionIdSet> findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + + Status removeTransactionRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) override; + + static Status removeTransactionRecordsHelper(OperationContext* opCtx, + const LogicalSessionIdSet& sessions); }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index 6c35d317b26..544915c8494 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -34,6 +34,7 @@ #include "mongo/db/operation_context.h" #include "mongo/db/query/canonical_query.h" #include "mongo/db/query/query_request.h" +#include "mongo/db/sessions_collection_rs.h" #include "mongo/s/commands/cluster_write.h" #include "mongo/s/query/cluster_find.h" #include "mongo/s/write_ops/batch_write_exec.h" @@ -71,7 +72,7 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, return Status(error, response.getErrMessage()); }; - return doRefresh(sessions, refreshTime, send); + return doRefresh(kSessionsNamespaceString, sessions, refreshTime, send); } Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, @@ -94,7 +95,7 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, return Status(error, response.getErrMessage()); }; - return doRemove(sessions, send); + return doRemove(kSessionsNamespaceString, sessions, send); } StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( @@ -140,7 +141,12 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( return result.obj(); }; - return doFetch(sessions, send); + return doFetch(kSessionsNamespaceString, sessions, send); +} + +Status SessionsCollectionSharded::removeTransactionRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { + return SessionsCollectionRS::removeTransactionRecordsHelper(opCtx, sessions); } } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h index cb0a2e6fd9e..f9a8165a1c7 100644 --- a/src/mongo/db/sessions_collection_sharded.h +++ b/src/mongo/db/sessions_collection_sharded.h @@ -58,6 +58,9 @@ public: StatusWith<LogicalSessionIdSet> findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + + Status removeTransactionRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) override; }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp index 8187b19ad06..ada3583bb26 100644 --- a/src/mongo/db/sessions_collection_standalone.cpp +++ b/src/mongo/db/sessions_collection_standalone.cpp @@ -48,19 +48,31 @@ Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx, const LogicalSessionRecordSet& sessions, Date_t refreshTime) { DBDirectClient client(opCtx); - return doRefresh(sessions, refreshTime, makeSendFnForBatchWrite(&client)); + return doRefresh(kSessionsNamespaceString, + sessions, + refreshTime, + makeSendFnForBatchWrite(kSessionsNamespaceString, &client)); } Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) { DBDirectClient client(opCtx); - return doRemove(sessions, makeSendFnForBatchWrite(&client)); + return doRemove(kSessionsNamespaceString, + sessions, + makeSendFnForBatchWrite(kSessionsNamespaceString, &client)); } StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) { DBDirectClient client(opCtx); - return doFetch(sessions, makeFindFnForCommand(&client)); + return doFetch(kSessionsNamespaceString, + sessions, + makeFindFnForCommand(kSessionsNamespaceString, &client)); +} + +Status SessionsCollectionStandalone::removeTransactionRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { + MONGO_UNREACHABLE; } } // namespace mongo diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h index 2c1bfe32096..2ecd49afa79 100644 --- a/src/mongo/db/sessions_collection_standalone.h +++ b/src/mongo/db/sessions_collection_standalone.h @@ -57,6 +57,9 @@ public: StatusWith<LogicalSessionIdSet> findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + + Status removeTransactionRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) override; }; } // namespace mongo diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp new file mode 100644 index 00000000000..7257a56f29a --- /dev/null +++ b/src/mongo/db/transaction_reaper.cpp @@ -0,0 +1,249 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/transaction_reaper.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/db/client.h" +#include "mongo/db/concurrency/d_concurrency.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/repl/replication_coordinator.h" +#include "mongo/db/server_parameters.h" +#include "mongo/db/session_txn_record.h" +#include "mongo/db/sessions_collection.h" +#include "mongo/s/catalog_cache.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" +#include "mongo/s/grid.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/destructor_guard.h" + +namespace mongo { + +namespace { + +constexpr Minutes kTransactionRecordMinimumLifetime(30); + +/** + * The minimum lifetime for a transaction record is how long it has to have lived on the server + * before we'll consider it for cleanup. This is effectively the window for how long it is + * permissible for a mongos to hang before we're willing to accept a failure of the retryable write + * subsystem. + * + * Specifically, we imagine that a client connects to one mongos on a session and performs a + * retryable write. That mongos hangs. Then the client connects to a new mongos on the same + * session and successfully executes its write. After a day passes, the session will time out, + * cleaning up the retryable write. Then the original mongos wakes up, vivifies the session and + * executes the write (because all records of the session + transaction have been deleted). + * + * So the write is performed twice, which is unavoidable without losing session vivification and/or + * requiring synchronized clocks all the way out to the client. In lieu of that we provide a weaker + * guarantee after the minimum transaction lifetime. + */ +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes, + int, + kTransactionRecordMinimumLifetime.count()); + +const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1); +const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1); + +/** + * Makes the query we'll use to scan the transactions table. + * + * Scans for records older than the minimum lifetime and uses a sort to walk the index and attempt + * to pull records likely to be on the same chunks (because they sort near each other). + */ +Query makeQuery(Date_t now) { + Timestamp possiblyExpired( + duration_cast<Seconds>( + (now - Minutes(TransactionRecordMinimumLifetimeMinutes)).toDurationSinceEpoch()), + 0); + BSONObjBuilder bob; + { + BSONObjBuilder subbob(bob.subobjStart(SessionTxnRecord::kLastWriteOpTimeTsFieldName)); + subbob.append("$lt", possiblyExpired); + } + Query query(bob.obj()); + query.sort(kSortById); + return query; +} + +/** + * Our impl is templatized on a type which handles the lsids we see. It provides the top level + * scaffolding for figuring out if we're the primary node responsible for the transaction table and + * invoking the hanlder. + * + * The handler here will see all of the possibly expired txn ids in the transaction table and will + * have a lifetime associated with a single call to reap. + */ +template <typename Handler> +class TransactionReaperImpl final : public TransactionReaper { +public: + TransactionReaperImpl(std::shared_ptr<SessionsCollection> collection) + : _collection(std::move(collection)) {} + + void reap(OperationContext* opCtx) override { + Handler handler(opCtx, _collection.get()); + + Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IS); + Lock::CollectionLock lock(opCtx->lockState(), SessionsCollection::kSessionsFullNS, MODE_IS); + + auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); + if (coord->canAcceptWritesForDatabase( + opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) { + DBDirectClient client(opCtx); + + auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now()); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), + query, + 0, + 0, + &kIdProjection); + + while (cursor->more()) { + auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( + "TransactionSession"_sd, cursor->next()); + + handler.handleLsid(transactionSession.get_id()); + } + } + } + +private: + std::shared_ptr<SessionsCollection> _collection; +}; + +void handleBatchHelper(SessionsCollection* sessionsCollection, + OperationContext* opCtx, + const LogicalSessionIdSet& batch) { + auto removed = uassertStatusOK(sessionsCollection->findRemovedSessions(opCtx, batch)); + uassertStatusOK(sessionsCollection->removeTransactionRecords(opCtx, removed)); +} + +/** + * The repl impl is simple, just pass along to the sessions collection for checking ids locally + */ +class ReplHandler { +public: + ReplHandler(OperationContext* opCtx, SessionsCollection* collection) + : _opCtx(opCtx), _sessionsCollection(collection) {} + + ~ReplHandler() { + DESTRUCTOR_GUARD([&] { handleBatchHelper(_sessionsCollection, _opCtx, _batch); }()); + } + + void handleLsid(const LogicalSessionId& lsid) { + _batch.insert(lsid); + if (_batch.size() > write_ops::kMaxWriteBatchSize) { + handleBatchHelper(_sessionsCollection, _opCtx, _batch); + _batch.clear(); + } + } + +private: + OperationContext* _opCtx; + SessionsCollection* _sessionsCollection; + + LogicalSessionIdSet _batch; +}; + +/** + * The sharding impl is a little fancier. Try to bucket by shard id, to avoid doing repeated small + * scans. + */ +class ShardedHandler { +public: + ShardedHandler(OperationContext* opCtx, SessionsCollection* collection) + : _opCtx(opCtx), _sessionsCollection(collection) {} + + ~ShardedHandler() { + DESTRUCTOR_GUARD([&] { + for (const auto& pair : _shards) { + handleBatchHelper(_sessionsCollection, _opCtx, pair.second); + } + }()); + } + + void handleLsid(const LogicalSessionId& lsid) { + // There are some lifetime issues with when the reaper starts up versus when the grid is + // available. Moving routing info fetching until after we have a transaction moves us past + // the problem. + // + // Also, we should only need the chunk case, but that'll wait until the sessions table is + // actually sharded. + if (!(_cm || _primary)) { + auto routingInfo = + uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo( + _opCtx, SessionsCollection::kSessionsFullNS)); + _cm = routingInfo.cm(); + _primary = routingInfo.primary(); + } + ShardId shardId; + if (_cm) { + auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON()); + shardId = chunk->getShardId(); + } else { + shardId = _primary->getId(); + } + auto& lsids = _shards[shardId]; + lsids.insert(lsid); + if (lsids.size() > write_ops::kMaxWriteBatchSize) { + handleBatchHelper(_sessionsCollection, _opCtx, lsids); + _shards.erase(shardId); + } + } + +private: + OperationContext* _opCtx; + SessionsCollection* _sessionsCollection; + std::shared_ptr<ChunkManager> _cm; + std::shared_ptr<Shard> _primary; + + stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards; +}; + +} // namespace + +std::unique_ptr<TransactionReaper> TransactionReaper::make( + Type type, std::shared_ptr<SessionsCollection> collection) { + switch (type) { + case Type::kReplicaSet: + return stdx::make_unique<TransactionReaperImpl<ReplHandler>>(std::move(collection)); + case Type::kSharded: + return stdx::make_unique<TransactionReaperImpl<ShardedHandler>>(std::move(collection)); + } + MONGO_UNREACHABLE; +} + +TransactionReaper::~TransactionReaper() = default; + +} // namespace mongo diff --git a/src/mongo/db/transaction_reaper.h b/src/mongo/db/transaction_reaper.h new file mode 100644 index 00000000000..fdec1083916 --- /dev/null +++ b/src/mongo/db/transaction_reaper.h @@ -0,0 +1,62 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#pragma once + +#include <memory> + +namespace mongo { + +class ServiceContext; +class SessionsCollection; +class OperationContext; + +/** + * TransactionReaper is responsible for scanning the transaction table, checking if sessions are + * still alive and deleting the transaction records if their sessions have expired. + */ +class TransactionReaper { +public: + enum class Type { + kReplicaSet, + kSharded, + }; + + virtual ~TransactionReaper() = 0; + + virtual void reap(OperationContext* OperationContext) = 0; + + /** + * The implementation of the sessions collections is different in replica sets versus sharded + * clusters, so we have a factory to pick the right impl. + */ + static std::unique_ptr<TransactionReaper> make(Type type, + std::shared_ptr<SessionsCollection> collection); +}; + +} // namespace mongo |