diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-04-18 18:11:49 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-09-05 10:37:59 -0400 |
commit | 273ed84bc7eb6cc0e38eca16088fc1a454f1cb28 (patch) | |
tree | 6ee8e5dcdedffc4b1ce0a48bf871248fc3984816 | |
parent | d2c2e6c73c424d5a28d5bd2a9031e4796a5e4371 (diff) | |
download | mongo-273ed84bc7eb6cc0e38eca16088fc1a454f1cb28.tar.gz |
SERVER-37837 Move `config.transactions` manipulation out of SessionsCollection
(cherry picked from commit dabbf059e6f96edb4898b42d532d460efd2510f2)
(cherry picked from commit 2b40ec0f649def6e120b78510e8b008a43852a09)
-rw-r--r-- | src/mongo/db/SConscript | 14 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_factory_mongod.cpp | 46 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_factory_mongos.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 16 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.h | 51 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_mock.h | 2 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.h | 3 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_mock.h | 5 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_rs.cpp | 22 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_rs.h | 18 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_sharded.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_sharded.h | 3 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_standalone.cpp | 5 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection_standalone.h | 3 | ||||
-rw-r--r-- | src/mongo/db/transaction_reaper.cpp | 106 | ||||
-rw-r--r-- | src/mongo/db/transaction_reaper_test.cpp | 104 | ||||
-rw-r--r-- | src/mongo/embedded/logical_session_cache_factory_embedded.cpp | 2 |
18 files changed, 222 insertions, 189 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 76c666b0f0d..b7ba27da87c 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1397,20 +1397,24 @@ envWithAsio.CppUnitTest( target='logical_session_cache_test', source=[ 'logical_session_cache_test.cpp', + 'transaction_reaper_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/executor/async_timer_mock', + '$BUILD_DIR/mongo/util/clock_source_mock', 'auth/authmocks', - 'keys_collection_manager', + 'dbdirectclient', 'keys_collection_document', + 'keys_collection_manager', 'logical_clock', - 'logical_session_id', - 'logical_session_id_helpers', - 'logical_session_cache', 'logical_session_cache_impl', + 'logical_session_id_helpers', + 'logical_session_id', + 'repl/replmocks', + 'service_context_d_test_fixture', 'service_liaison_mock', - 'service_context_test_fixture', 'sessions_collection_mock', + 'transaction_reaper', ], ) diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp index 8eafdc8bf57..0b7544adc18 100644 --- a/src/mongo/db/logical_session_cache_factory_mongod.cpp +++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp @@ -48,30 +48,23 @@ namespace mongo { -namespace { - -std::shared_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheServer state) { - switch (state) { - case LogicalSessionCacheServer::kSharded: - return std::make_shared<SessionsCollectionSharded>(); - case LogicalSessionCacheServer::kConfigServer: - return std::make_shared<SessionsCollectionConfigServer>(); - case LogicalSessionCacheServer::kReplicaSet: - return std::make_shared<SessionsCollectionRS>(); - case LogicalSessionCacheServer::kStandalone: - return std::make_shared<SessionsCollectionStandalone>(); - default: - MONGO_UNREACHABLE; - } -} - -} // namespace - std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state) { auto liaison = stdx::make_unique<ServiceLiaisonMongod>(); - // Set up the logical session cache - auto sessionsColl = makeSessionsCollection(state); + auto sessionsColl = [&]() -> std::shared_ptr<SessionsCollection> { + switch (state) { + case LogicalSessionCacheServer::kSharded: + return std::make_shared<SessionsCollectionSharded>(); + case LogicalSessionCacheServer::kConfigServer: + return std::make_shared<SessionsCollectionConfigServer>(); + case LogicalSessionCacheServer::kReplicaSet: + return std::make_shared<SessionsCollectionRS>(); + case LogicalSessionCacheServer::kStandalone: + return std::make_shared<SessionsCollectionStandalone>(); + } + + MONGO_UNREACHABLE; + }(); auto reaper = [&]() -> std::shared_ptr<TransactionReaper> { switch (state) { @@ -79,15 +72,16 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCach return TransactionReaper::make(TransactionReaper::Type::kSharded, sessionsColl); case LogicalSessionCacheServer::kReplicaSet: return TransactionReaper::make(TransactionReaper::Type::kReplicaSet, sessionsColl); - default: + case LogicalSessionCacheServer::kConfigServer: + case LogicalSessionCacheServer::kStandalone: return nullptr; } + + MONGO_UNREACHABLE; }(); - return stdx::make_unique<LogicalSessionCacheImpl>(std::move(liaison), - std::move(sessionsColl), - std::move(reaper), - LogicalSessionCacheImpl::Options{}); + return stdx::make_unique<LogicalSessionCacheImpl>( + std::move(liaison), std::move(sessionsColl), std::move(reaper)); } } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_factory_mongos.cpp b/src/mongo/db/logical_session_cache_factory_mongos.cpp index 078858d4569..2f3c297b202 100644 --- a/src/mongo/db/logical_session_cache_factory_mongos.cpp +++ b/src/mongo/db/logical_session_cache_factory_mongos.cpp @@ -47,7 +47,7 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS() { auto sessionsColl = stdx::make_unique<SessionsCollectionSharded>(); return stdx::make_unique<LogicalSessionCacheImpl>( - std::move(liaison), std::move(sessionsColl), nullptr, LogicalSessionCacheImpl::Options{}); + std::move(liaison), std::move(sessionsColl), nullptr /* reaper */); } } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index 9d9b1498776..c2b282baf3f 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -72,11 +72,8 @@ constexpr Milliseconds LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh; LogicalSessionCacheImpl::LogicalSessionCacheImpl( std::unique_ptr<ServiceLiaison> service, std::shared_ptr<SessionsCollection> collection, - std::shared_ptr<TransactionReaper> transactionReaper, - Options options) - : _refreshInterval(options.refreshInterval), - _sessionTimeout(options.sessionTimeout), - _service(std::move(service)), + std::shared_ptr<TransactionReaper> transactionReaper) + : _service(std::move(service)), _sessionsColl(std::move(collection)), _transactionReaper(std::move(transactionReaper)) { _stats.setLastSessionsCollectionJobTimestamp(now()); @@ -85,11 +82,12 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl( if (!disableLogicalSessionCacheRefresh) { _service->scheduleJob({"LogicalSessionCacheRefresh", [this](Client* client) { _periodicRefresh(client); }, - _refreshInterval}); + Milliseconds(logicalSessionRefreshMillis)}); + if (_transactionReaper) { _service->scheduleJob({"LogicalSessionCacheReap", [this](Client* client) { _periodicReap(client); }, - _refreshInterval}); + Milliseconds(logicalSessionRefreshMillis)}); } } } @@ -98,8 +96,8 @@ LogicalSessionCacheImpl::~LogicalSessionCacheImpl() { try { _service->join(); } catch (...) { - // If we failed to join we might still be running a background thread, - // log but swallow the error since there is no good way to recover. + // If we failed to join we might still be running a background thread, log but swallow the + // error since there is no good way to recover severe() << "Failed to join background service thread"; } } diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h index 4d2299a157f..9158da1397a 100644 --- a/src/mongo/db/logical_session_cache_impl.h +++ b/src/mongo/db/logical_session_cache_impl.h @@ -47,52 +47,30 @@ class Client; class OperationContext; class ServiceContext; -extern int logicalSessionRefreshMillis; - /** * A thread-safe cache structure for logical session records. * - * The cache takes ownership of the passed-in ServiceLiaison and - * SessionsCollection helper types. + * The cache takes ownership of the passed-in ServiceLiaison and SessionsCollection helper types. + * + * Uses the following service-wide parameters: + * - A timeout value to use for sessions in the cache, in minutes. Defaults to 30 minutes. + * --setParameter localLogicalSessionTimeoutMinutes=X + * + * - The interval over which the cache will refresh session records. By default, this is set to + * every 5 minutes (300,000). If the caller is setting the sessionTimeout by hand, it is + * suggested that they consider also setting the refresh interval accordingly. + * --setParameter logicalSessionRefreshMillis=X. */ class LogicalSessionCacheImpl final : public LogicalSessionCache { public: static constexpr Milliseconds kLogicalSessionDefaultRefresh = Milliseconds(5 * 60 * 1000); /** - * An Options type to support the LogicalSessionCacheImpl. - */ - struct Options { - Options(){}; - - /** - * A timeout value to use for sessions in the cache, in minutes. - * - * By default, this is set to 30 minutes. - * - * May be set with --setParameter localLogicalSessionTimeoutMinutes=X. - */ - Minutes sessionTimeout = Minutes(localLogicalSessionTimeoutMinutes); - - /** - * The interval over which the cache will refresh session records. - * - * By default, this is set to every 5 minutes (300,000). If the caller - * is setting the sessionTimeout by hand, it is suggested that they - * consider also setting the refresh interval accordingly. - * - * May be set with --setParameter logicalSessionRefreshMillis=X. - */ - Milliseconds refreshInterval = Milliseconds(logicalSessionRefreshMillis); - }; - - /** * Construct a new session cache. */ - explicit LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service, - std::shared_ptr<SessionsCollection> collection, - std::shared_ptr<TransactionReaper> transactionReaper, - Options options = Options{}); + LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service, + std::shared_ptr<SessionsCollection> collection, + std::shared_ptr<TransactionReaper> transactionReaper); LogicalSessionCacheImpl(const LogicalSessionCacheImpl&) = delete; LogicalSessionCacheImpl& operator=(const LogicalSessionCacheImpl&) = delete; @@ -150,9 +128,6 @@ private: */ Status _addToCache(LogicalSessionRecord record); - const Milliseconds _refreshInterval; - const Minutes _sessionTimeout; - // This value is only modified under the lock, and is modified // automatically by the background jobs. LogicalSessionCacheStats _stats; diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp index fe654e6adc3..8bbf986d638 100644 --- a/src/mongo/db/logical_session_cache_test.cpp +++ b/src/mongo/db/logical_session_cache_test.cpp @@ -55,7 +55,7 @@ namespace mongo { namespace { const Milliseconds kSessionTimeout = duration_cast<Milliseconds>(kLogicalSessionDefaultTimeout); -const Milliseconds kForceRefresh = LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh; +const Milliseconds kForceRefresh{LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh}; using SessionList = std::list<LogicalSessionId>; using unittest::EnsureFCV; @@ -80,7 +80,7 @@ public: auto mockService = stdx::make_unique<MockServiceLiaison>(_service); auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions); _cache = stdx::make_unique<LogicalSessionCacheImpl>( - std::move(mockService), std::move(mockSessions), nullptr); + std::move(mockService), std::move(mockSessions), nullptr /* reaper */); } void waitUntilRefreshScheduled() { diff --git a/src/mongo/db/repl/replication_coordinator_mock.h b/src/mongo/db/repl/replication_coordinator_mock.h index 94005d84f5d..7decacbdc96 100644 --- a/src/mongo/db/repl/replication_coordinator_mock.h +++ b/src/mongo/db/repl/replication_coordinator_mock.h @@ -281,8 +281,6 @@ public: */ void alwaysAllowWrites(bool allowWrites); - void setMaster(bool isMaster); - virtual ServiceContext* getServiceContext() override { return _service; } diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h index 5cb0c595419..5daabee3d82 100644 --- a/src/mongo/db/sessions_collection.h +++ b/src/mongo/db/sessions_collection.h @@ -81,9 +81,6 @@ public: */ virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0; - virtual Status removeTransactionRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) = 0; - /** * Checks a set of lsids and returns the set that no longer exists * diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h index c7cb1ff5338..6bee96cf6af 100644 --- a/src/mongo/db/sessions_collection_mock.h +++ b/src/mongo/db/sessions_collection_mock.h @@ -128,11 +128,6 @@ public: return _impl->findRemovedSessions(opCtx, sessions); } - Status removeTransactionRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) override { - return Status::OK(); - } - private: std::shared_ptr<MockSessionsCollectionImpl> _impl; }; diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp index e916fda9d6c..c0c33539772 100644 --- a/src/mongo/db/sessions_collection_rs.cpp +++ b/src/mongo/db/sessions_collection_rs.cpp @@ -259,26 +259,4 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions( }); } -Status SessionsCollectionRS::removeTransactionRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { - return dispatch( - NamespaceString::kSessionTransactionsTableNamespace, - opCtx, - [&] { - DBDirectClient client(opCtx); - return doRemove(NamespaceString::kSessionTransactionsTableNamespace, - sessions, - makeSendFnForBatchWrite( - NamespaceString::kSessionTransactionsTableNamespace, &client)); - }, - [](DBClientBase*) { - return Status(ErrorCodes::NotMaster, "Not eligible to remove transaction records"); - }); -} - -Status SessionsCollectionRS::removeTransactionRecordsHelper(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { - return SessionsCollectionRS{}.removeTransactionRecords(opCtx, sessions); -} - } // namespace mongo diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h index 0dfe92fe301..f79c704b04f 100644 --- a/src/mongo/db/sessions_collection_rs.h +++ b/src/mongo/db/sessions_collection_rs.h @@ -90,24 +90,6 @@ public: */ StatusWith<LogicalSessionIdSet> findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; - - /** - * Removes the transaction records for the specified sessions from the - * transaction table. - * - * If a step-down happens on this node as this method is running, it may fail. - */ - Status removeTransactionRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) override; - - /** - * Helper for a shard server to run its transaction operations as a replica set - * member. - * - * If a step-down happens on this node as this method is running, it may fail. - */ - static Status removeTransactionRecordsHelper(OperationContext* opCtx, - const LogicalSessionIdSet& sessions); }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index a4f5735954b..bae7f3ef8c6 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -164,9 +164,4 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( return doFetch(kSessionsNamespaceString, sessions, send); } -Status SessionsCollectionSharded::removeTransactionRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { - return SessionsCollectionRS::removeTransactionRecordsHelper(opCtx, sessions); -} - } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h index 39e1ff29087..112c56aae26 100644 --- a/src/mongo/db/sessions_collection_sharded.h +++ b/src/mongo/db/sessions_collection_sharded.h @@ -72,9 +72,6 @@ public: StatusWith<LogicalSessionIdSet> findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; - Status removeTransactionRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) override; - protected: Status _checkCacheForSessionsCollection(OperationContext* opCtx); }; diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp index 0cc7a105353..67d61e0c8a2 100644 --- a/src/mongo/db/sessions_collection_standalone.cpp +++ b/src/mongo/db/sessions_collection_standalone.cpp @@ -122,9 +122,4 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSession makeFindFnForCommand(kSessionsNamespaceString, &client)); } -Status SessionsCollectionStandalone::removeTransactionRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) { - MONGO_UNREACHABLE; -} - } // namespace mongo diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h index ac45fe33336..81c32c9c817 100644 --- a/src/mongo/db/sessions_collection_standalone.h +++ b/src/mongo/db/sessions_collection_standalone.h @@ -68,9 +68,6 @@ public: StatusWith<LogicalSessionIdSet> findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; - - Status removeTransactionRecords(OperationContext* opCtx, - const LogicalSessionIdSet& sessions) override; }; } // namespace mongo diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp index 2460284f21e..b50f7eecc77 100644 --- a/src/mongo/db/transaction_reaper.cpp +++ b/src/mongo/db/transaction_reaper.cpp @@ -33,9 +33,6 @@ #include "mongo/db/transaction_reaper.h" #include "mongo/bson/bsonmisc.h" -#include "mongo/db/catalog_raii.h" -#include "mongo/db/client.h" -#include "mongo/db/curop.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context.h" #include "mongo/db/ops/write_ops.h" @@ -48,7 +45,7 @@ #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" -#include "mongo/stdx/memory.h" +#include "mongo/s/write_ops/batched_command_response.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -108,24 +105,21 @@ public: : _collection(std::move(collection)) {} int reap(OperationContext* opCtx) override { - auto const coord = mongo::repl::ReplicationCoordinator::get(opCtx); - Handler handler(opCtx, *_collection); if (!handler.initialize()) { return 0; } - AutoGetCollection autoColl( - opCtx, NamespaceString::kSessionTransactionsTableNamespace, MODE_IS); - - // Only start reaping if the shard or config server node is currently the primary - if (!coord->canAcceptWritesForDatabase( + // Make a best-effort attempt to only reap when the node is running as a primary + const auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); + if (!coord->canAcceptWritesForDatabase_UNSAFE( opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) { return 0; } DBDirectClient client(opCtx); + // Fill all stale config.transactions entries auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now()); auto cursor = client.query( NamespaceString::kSessionTransactionsTableNamespace.ns(), query, 0, 0, &kIdProjection); @@ -149,35 +143,60 @@ private: * Removes the specified set of session ids from the persistent sessions collection and returns the * number of sessions actually removed. */ -int removeSessionsRecords(OperationContext* opCtx, - SessionsCollection& sessionsCollection, - const LogicalSessionIdSet& sessionIdsToRemove) { +int removeSessionsTransactionRecords(OperationContext* opCtx, + SessionsCollection& sessionsCollection, + const LogicalSessionIdSet& sessionIdsToRemove) { if (sessionIdsToRemove.empty()) { return 0; } - Locker* locker = opCtx->lockState(); - - Locker::LockSnapshot snapshot; - invariant(locker->saveLockStateAndUnlock(&snapshot)); - - const auto guard = MakeGuard([&] { - UninterruptibleLockGuard noInterrupt(opCtx->lockState()); - locker->restoreLockState(opCtx, snapshot); - }); - - // Top-level locks are freed, release any potential low-level (storage engine-specific - // locks). If we are yielding, we are at a safe place to do so. - opCtx->recoveryUnit()->abandonSnapshot(); - - // Track the number of yields in CurOp. - CurOp::get(opCtx)->yielded(); - - auto removed = + // From the passed-in sessions, find the ones which are actually expired/removed + auto expiredSessionIds = uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove)); - uassertStatusOK(sessionsCollection.removeTransactionRecords(opCtx, removed)); - return removed.size(); + DBDirectClient client(opCtx); + int numDeleted = 0; + + for (auto it = expiredSessionIds.begin(); it != expiredSessionIds.end();) { + write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace); + deleteOp.setWriteCommandBase([] { + write_ops::WriteCommandBase base; + base.setOrdered(false); + return base; + }()); + deleteOp.setDeletes([&] { + // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object + // size limit + const int kMaxBatchSize = 10'000; + std::vector<write_ops::DeleteOpEntry> entries; + for (; it != expiredSessionIds.end() && entries.size() < kMaxBatchSize; ++it) { + entries.emplace_back([&] { + write_ops::DeleteOpEntry entry; + entry.setQ(BSON(LogicalSessionRecord::kIdFieldName << it->toBSON())); + entry.setMulti(false); + return entry; + }()); + } + + return entries; + }()); + + BSONObj result; + client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(), + deleteOp.toBSON({}), + result); + + BatchedCommandResponse response; + std::string errmsg; + uassert(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse response " << result, + response.parseBSON(result, &errmsg)); + uassertStatusOK(response.getTopLevelStatus()); + + numDeleted += response.getN(); + } + + return numDeleted; } /** @@ -195,8 +214,8 @@ public: void handleLsid(const LogicalSessionId& lsid) { _batch.insert(lsid); - if (_batch.size() > write_ops::kMaxWriteBatchSize) { - _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch); + if (_batch.size() >= write_ops::kMaxWriteBatchSize) { + _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch); _batch.clear(); } } @@ -205,7 +224,7 @@ public: invariant(!_finalized); _finalized = true; - _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, _batch); + _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch); return _numReaped; } @@ -240,14 +259,18 @@ public: void handleLsid(const LogicalSessionId& lsid) { invariant(_cm); + + // This code attempts to group requests to 'removeSessionsTransactionRecords' to contain + // batches of lsids, which only fall on the same shard, so that the query to check whether + // they are alive doesn't need to do cross-shard scatter/gather queries const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON()); - const auto shardId = chunk.getShardId(); + const auto& shardId = chunk.getShardId(); auto& lsids = _shards[shardId]; lsids.insert(lsid); - if (lsids.size() > write_ops::kMaxWriteBatchSize) { - _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, lsids); + if (lsids.size() >= write_ops::kMaxWriteBatchSize) { + _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, lsids); _shards.erase(shardId); } } @@ -257,7 +280,8 @@ public: _finalized = true; for (const auto& pair : _shards) { - _numReaped += removeSessionsRecords(_opCtx, _sessionsCollection, pair.second); + _numReaped += + removeSessionsTransactionRecords(_opCtx, _sessionsCollection, pair.second); } return _numReaped; diff --git a/src/mongo/db/transaction_reaper_test.cpp b/src/mongo/db/transaction_reaper_test.cpp new file mode 100644 index 00000000000..fe78891ef13 --- /dev/null +++ b/src/mongo/db/transaction_reaper_test.cpp @@ -0,0 +1,104 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/repl/replication_coordinator_mock.h" +#include "mongo/db/service_context_d_test_fixture.h" +#include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/sessions_collection_mock.h" +#include "mongo/db/transaction_reaper.h" +#include "mongo/util/clock_source_mock.h" + +namespace mongo { +namespace { + +class TransactionReaperTest : public ServiceContextMongoDTest { +protected: + void setUp() override { + const auto service = getServiceContext(); + auto replCoord = std::make_unique<repl::ReplicationCoordinatorMock>(service); + ASSERT_OK(replCoord->setFollowerMode(repl::MemberState::RS_PRIMARY)); + + repl::ReplicationCoordinator::set(service, std::move(replCoord)); + service->setFastClockSource(std::make_unique<ClockSourceMock>()); + } + + ClockSourceMock* clock() { + return dynamic_cast<ClockSourceMock*>(getServiceContext()->getFastClockSource()); + } + + ServiceContext::UniqueOperationContext _uniqueOpCtx{makeOperationContext()}; + OperationContext* _opCtx{_uniqueOpCtx.get()}; + + std::shared_ptr<MockSessionsCollectionImpl> _collectionMock{ + std::make_shared<MockSessionsCollectionImpl>()}; + + std::unique_ptr<TransactionReaper> _reaper{ + TransactionReaper::make(TransactionReaper::Type::kReplicaSet, + std::make_shared<MockSessionsCollection>(_collectionMock))}; +}; + +TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) { + _collectionMock->add([&] { + LogicalSessionRecord rec; + rec.setId(makeLogicalSessionIdForTest()); + rec.setLastUse(clock()->now()); + return rec; + }()); + + _collectionMock->add([&] { + LogicalSessionRecord rec; + rec.setId(makeLogicalSessionIdForTest()); + rec.setLastUse(clock()->now()); + return rec; + }()); + + DBDirectClient client(_opCtx); + SessionTxnRecord txn1; + txn1.setSessionId(makeLogicalSessionIdForTest()); + txn1.setTxnNum(100); + txn1.setLastWriteOpTime(repl::OpTime(Timestamp(100), 1)); + txn1.setLastWriteDate(clock()->now()); + SessionTxnRecord txn2; + txn2.setSessionId(makeLogicalSessionIdForTest()); + txn2.setTxnNum(200); + txn2.setLastWriteOpTime(repl::OpTime(Timestamp(200), 1)); + txn2.setLastWriteDate(clock()->now()); + + client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), + std::vector<BSONObj>{txn1.toBSON(), txn2.toBSON()}); + + clock()->advance(Minutes{31}); + ASSERT_EQ(2, _reaper->reap(_opCtx)); +} + +} // namespace +} // namespace mongo diff --git a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp index 1a22a713c45..7f68667c466 100644 --- a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp +++ b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp @@ -51,7 +51,7 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheEmbedded() { auto sessionsColl = std::make_shared<SessionsCollectionStandalone>(); return stdx::make_unique<LogicalSessionCacheImpl>( - std::move(liaison), std::move(sessionsColl), nullptr, LogicalSessionCacheImpl::Options{}); + std::move(liaison), std::move(sessionsColl), nullptr /* reaper */); } } // namespace mongo |