diff options
30 files changed, 458 insertions, 695 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript index a070a881f0a..871429b589d 100644 --- a/src/mongo/SConscript +++ b/src/mongo/SConscript @@ -328,14 +328,14 @@ mongod = env.Program( 'db/catalog/index_key_validate', 'db/cloner', 'db/collection_index_usage_tracker', - 'db/commands/mongod', 'db/commands/mongod_fcv', + 'db/commands/mongod', 'db/commands/server_status_servers', 'db/common', 'db/concurrency/lock_manager', 'db/concurrency/write_conflict_exception', - 'db/curop', 'db/curop_metrics', + 'db/curop', 'db/db_raii', 'db/dbdirectclient', 'db/dbhelpers', @@ -343,9 +343,9 @@ mongod = env.Program( 'db/free_mon/free_mon_mongod', 'db/ftdc/ftdc_mongod', 'db/fts/ftsmongod', + 'db/index_builds_coordinator_mongod', 'db/index/index_access_method', 'db/index/index_descriptor', - 'db/index_builds_coordinator_mongod', 'db/initialize_snmp', 'db/introspect', 'db/keys_collection_client_direct', @@ -375,13 +375,13 @@ mongod = env.Program( 'db/repl/rs_rollback', 'db/repl/rslog', 'db/repl/serveronly_repl', - 'db/repl/storage_interface', 'db/repl/storage_interface_impl', + 'db/repl/storage_interface', 'db/repl/topology_coordinator', 'db/rw_concern_d', 'db/s/balancer', - 'db/s/commands_db_s', 'db/s/op_observer_sharding_impl', + 'db/s/sharding_commands_d', 'db/s/sharding_runtime_d', 'db/service_context_d', 'db/startup_warnings_mongod', @@ -392,8 +392,8 @@ mongod = env.Program( 'db/storage/biggie/storage_biggie', 'db/storage/devnull/storage_devnull', 'db/storage/ephemeral_for_test/storage_ephemeral_for_test', - 'db/storage/flow_control', 'db/storage/flow_control_parameters', + 'db/storage/flow_control', 'db/storage/storage_engine_lock_file', 'db/storage/storage_engine_metadata', 'db/storage/storage_init_d', @@ -403,8 +403,8 @@ mongod = env.Program( 'db/traffic_recorder', 'db/ttl_collection_cache', 'db/ttl_d', - 'db/update/update_driver', 'db/update_index_data', + 'db/update/update_driver', 'db/views/views_mongod', 'db/windows_options' if env.TargetOSIs('windows') else [], 'executor/network_interface_factory', @@ -490,15 +490,19 @@ mongos = env.Program( LIBDEPS=[ 'db/audit', 'db/auth/authmongos', - 'db/commands/server_status', 'db/commands/server_status_core', 'db/commands/server_status_servers', + 'db/commands/server_status', 'db/curop', 'db/ftdc/ftdc_mongos', + 'db/logical_session_cache_impl', + 'db/logical_session_cache', 'db/logical_time_metadata_hook', 'db/mongodandmongos', - 'db/server_options', 'db/server_options_base', + 'db/server_options', + 'db/service_liaison_mongos', + 'db/sessions_collection_sharded', 'db/startup_warnings_common', 'db/stats/counters', 'db/windows_options' if env.TargetOSIs('windows') else [], @@ -507,10 +511,10 @@ mongos = env.Program( 's/committed_optime_metadata_hook', 's/coreshard', 's/is_mongos', + 's/query/cluster_cursor_cleanup_job', 's/sharding_egress_metadata_hook_for_mongos', 's/sharding_initialization', 's/sharding_router_api', - 's/query/cluster_cursor_cleanup_job', 'transport/message_compressor_options_server', 'transport/service_entry_point', 'transport/transport_layer_manager', diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 9f4088dba01..93928f28e42 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1501,6 +1501,7 @@ env.Library( source=[ 'logical_session_cache.cpp', env.Idlc('logical_session_cache_stats.idl')[0], + env.Idlc('logical_session_cache.idl')[0], ], LIBDEPS=[ 'logical_session_id', @@ -1517,8 +1518,6 @@ env.Library( 'initialize_operation_session_info.cpp', 'logical_session_cache_impl.cpp', 'logical_session_server_status_section.cpp', - env.Idlc('logical_session_cache_impl.idl')[0], - env.Idlc('commands/end_sessions.idl')[0], ], LIBDEPS=[ 'logical_session_cache', @@ -1546,10 +1545,9 @@ env.Library( ) env.Library( - target='transaction_reaper', + target='transaction_reaper_d', source=[ - 'transaction_reaper.cpp', - env.Idlc('transaction_reaper.idl')[0], + 'transaction_reaper_d.cpp', ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface', @@ -1567,7 +1565,7 @@ envWithAsio.CppUnitTest( target='logical_session_cache_test', source=[ 'logical_session_cache_test.cpp', - 'transaction_reaper_test.cpp', + 'transaction_reaper_d_test.cpp', ], LIBDEPS=[ '$BUILD_DIR/mongo/executor/async_timer_mock', @@ -1584,7 +1582,7 @@ envWithAsio.CppUnitTest( 'service_context_d_test_fixture', 'service_liaison_mock', 'sessions_collection_mock', - 'transaction_reaper', + 'transaction_reaper_d', 'transaction', ], ) @@ -1604,20 +1602,7 @@ envWithAsio.Library( 'sessions_collection_standalone', ], LIBDEPS_PRIVATE=[ - 'transaction_reaper', - ], -) - -envWithAsio.Library( - target='logical_session_cache_factory_mongos', - source=[ - 'logical_session_cache_factory_mongos.cpp', - ], - LIBDEPS=[ - 'logical_session_cache', - 'logical_session_cache_impl', - 'service_liaison_mongos', - 'sessions_collection_sharded', + 'transaction_reaper_d', ], ) diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 7f83d0d192d..8c7fc3639ad 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -102,11 +102,12 @@ env.Library( "kill_all_sessions_command.cpp", "kill_sessions_command.cpp", "parameters.cpp", - env.Idlc('parameters.idl')[0], "refresh_logical_session_cache_now.cpp", "refresh_sessions_command.cpp", "rename_collection_common.cpp", "start_session_command.cpp", + env.Idlc('end_sessions.idl')[0], + env.Idlc('parameters.idl')[0], ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/bson/mutable/mutable_bson', diff --git a/src/mongo/db/commands/end_sessions_command.cpp b/src/mongo/db/commands/end_sessions_command.cpp index 9822eebd60c..b8325015cf1 100644 --- a/src/mongo/db/commands/end_sessions_command.cpp +++ b/src/mongo/db/commands/end_sessions_command.cpp @@ -33,11 +33,13 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/end_sessions_gen.h" #include "mongo/db/logical_session_cache.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" namespace mongo { +namespace { class EndSessionsCommand final : public BasicCommand { EndSessionsCommand(const EndSessionsCommand&) = delete; @@ -73,10 +75,10 @@ public: } } - virtual bool run(OperationContext* opCtx, - const std::string& db, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { + bool run(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { auto lsCache = LogicalSessionCache::get(opCtx); auto cmd = EndSessionsCmdFromClient::parse("EndSessionsCmdFromClient"_sd, cmdObj); @@ -84,6 +86,8 @@ public: lsCache->endSessions(makeLogicalSessionIds(cmd.getEndSessions(), opCtx)); return true; } + } endSessionsCommand; +} // namespace } // namespace mongo diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp index 94ec16df197..31477531f64 100644 --- a/src/mongo/db/db.cpp +++ b/src/mongo/db/db.cpp @@ -511,7 +511,11 @@ ExitCode _initAndListen(int listenPort) { auto shardingInitialized = ShardingInitializationMongoD::get(startupOpCtx.get()) ->initializeShardingAwarenessIfNeeded(startupOpCtx.get()); if (shardingInitialized) { - waitForShardRegistryReload(startupOpCtx.get()).transitional_ignore(); + auto status = waitForShardRegistryReload(startupOpCtx.get()); + if (!status.isOK()) { + LOG(0) << "Failed to load the shard registry as part of startup" + << causedBy(redact(status)); + } } auto storageEngine = serviceContext->getStorageEngine(); @@ -622,8 +626,7 @@ ExitCode _initAndListen(int listenPort) { kind = LogicalSessionCacheServer::kReplicaSet; } - auto sessionCache = makeLogicalSessionCacheD(kind); - LogicalSessionCache::set(serviceContext, std::move(sessionCache)); + LogicalSessionCache::set(serviceContext, makeLogicalSessionCacheD(kind)); // MessageServer::run will return when exit code closes its socket and we don't need the // operation context anymore @@ -903,6 +906,11 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) { balancer->waitForBalancerToStop(); } + // Join the logical session cache before the transport layer. + if (auto lsc = LogicalSessionCache::get(serviceContext)) { + lsc->joinOnShutDown(); + } + // Shutdown the TransportLayer so that new connections aren't accepted if (auto tl = serviceContext->getTransportLayer()) { log(LogComponent::kNetwork) << "shutdown: going to close listening sockets..."; diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h index 703ec97a337..e035ee13d7f 100644 --- a/src/mongo/db/logical_session_cache.h +++ b/src/mongo/db/logical_session_cache.h @@ -32,7 +32,7 @@ #include <boost/optional.hpp> #include "mongo/base/status.h" -#include "mongo/db/commands/end_sessions_gen.h" +#include "mongo/db/logical_session_cache_gen.h" #include "mongo/db/logical_session_cache_stats_gen.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/refresh_sessions_gen.h" @@ -58,6 +58,11 @@ public: virtual ~LogicalSessionCache() = 0; /** + * Invoked on service shutdown time in order to join the cache's refresher and reaper tasks. + */ + virtual void joinOnShutDown() = 0; + + /** * If the cache contains a record for this LogicalSessionId, promotes that lsid * to be the most recently used and updates its lastUse date to be the current * time. Returns an error if the session was not found. diff --git a/src/mongo/db/logical_session_cache_impl.idl b/src/mongo/db/logical_session_cache.idl index 71cd63e1ed3..3294df14378 100644 --- a/src/mongo/db/logical_session_cache_impl.idl +++ b/src/mongo/db/logical_session_cache.idl @@ -1,4 +1,4 @@ -# Copyright (C) 2018-present MongoDB, Inc. +# Copyright (C) 2019-present MongoDB, Inc. # # This program is free software: you can redistribute it and/or modify # it under the terms of the Server Side Public License, version 1, @@ -52,3 +52,24 @@ server_parameters: cpp_vartype: bool cpp_varname: disableLogicalSessionCacheRefresh default: false + + TransactionRecordMinimumLifetimeMinutes: + # The minimum lifetime for a transaction record is how long it has to have lived on the server + # before we'll consider it for cleanup. This is effectively the window for how long it is + # permissible for a mongos to hang before we're willing to accept a failure of the retryable + # writes subsystem. + # + # Specifically, we imagine that a client connects to one mongos on a session and performs a + # retryable write after which that mongos hangs. Then the client connects to a new mongos on the + # same session and successfully executes its write. After a day passes, the session will time + # out, cleaning up the retryable write. Then the original mongos wakes up, vivifies the session + # and executes the write (because all records of the session + transaction have been deleted). + # + # So the write is performed twice, which is unavoidable without losing session vivification + # and/or requiring synchronized clocks all the way out to the client. In lieu of that we + # provide a weaker guarantee after the minimum transaction lifetime. + description: The minimum lifetime for a transaction record. + set_at: startup + cpp_vartype: int + cpp_varname: gTransactionRecordMinimumLifetimeMinutes + default: 30 diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp index 6610702c1ca..2d9870e258a 100644 --- a/src/mongo/db/logical_session_cache_factory_mongod.cpp +++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp @@ -31,8 +31,6 @@ #include "mongo/platform/basic.h" -#include <memory> - #include "mongo/db/logical_session_cache_factory_mongod.h" #include "mongo/db/logical_session_cache_impl.h" @@ -41,14 +39,14 @@ #include "mongo/db/sessions_collection_rs.h" #include "mongo/db/sessions_collection_sharded.h" #include "mongo/db/sessions_collection_standalone.h" -#include "mongo/db/transaction_reaper.h" +#include "mongo/db/transaction_reaper_d.h" #include "mongo/stdx/memory.h" #include "mongo/util/log.h" namespace mongo { std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state) { - auto liaison = stdx::make_unique<ServiceLiaisonMongod>(); + auto liaison = std::make_unique<ServiceLiaisonMongod>(); auto sessionsColl = [&]() -> std::shared_ptr<SessionsCollection> { switch (state) { @@ -65,22 +63,8 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCach MONGO_UNREACHABLE; }(); - auto reaper = [&]() -> std::shared_ptr<TransactionReaper> { - switch (state) { - case LogicalSessionCacheServer::kSharded: - return TransactionReaper::make(TransactionReaper::Type::kSharded, sessionsColl); - case LogicalSessionCacheServer::kReplicaSet: - return TransactionReaper::make(TransactionReaper::Type::kReplicaSet, sessionsColl); - case LogicalSessionCacheServer::kConfigServer: - case LogicalSessionCacheServer::kStandalone: - return nullptr; - } - - MONGO_UNREACHABLE; - }(); - - return stdx::make_unique<LogicalSessionCacheImpl>( - std::move(liaison), std::move(sessionsColl), std::move(reaper)); + return std::make_unique<LogicalSessionCacheImpl>( + std::move(liaison), std::move(sessionsColl), TransactionReaperD::reapSessionsOlderThan); } } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_factory_mongos.cpp b/src/mongo/db/logical_session_cache_factory_mongos.cpp deleted file mode 100644 index 2f8942c8d85..00000000000 --- a/src/mongo/db/logical_session_cache_factory_mongos.cpp +++ /dev/null @@ -1,51 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include <memory> - -#include "mongo/db/logical_session_cache_factory_mongos.h" - -#include "mongo/db/logical_session_cache_impl.h" -#include "mongo/db/service_liaison_mongos.h" -#include "mongo/db/sessions_collection_sharded.h" -#include "mongo/stdx/memory.h" - -namespace mongo { - -std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS() { - auto liaison = stdx::make_unique<ServiceLiaisonMongos>(); - auto sessionsColl = stdx::make_unique<SessionsCollectionSharded>(); - - return stdx::make_unique<LogicalSessionCacheImpl>( - std::move(liaison), std::move(sessionsColl), nullptr /* reaper */); -} - -} // namespace mongo diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index 24d278c9150..2095595eb5f 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -33,7 +33,6 @@ #include "mongo/db/logical_session_cache_impl.h" -#include "mongo/db/logical_session_cache_impl_gen.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" @@ -58,13 +57,12 @@ void clearShardingOperationFailedStatus(OperationContext* opCtx) { } // namespace -LogicalSessionCacheImpl::LogicalSessionCacheImpl( - std::unique_ptr<ServiceLiaison> service, - std::shared_ptr<SessionsCollection> collection, - std::shared_ptr<TransactionReaper> transactionReaper) +LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service, + std::shared_ptr<SessionsCollection> collection, + ReapSessionsOlderThanFn reapSessionsOlderThanFn) : _service(std::move(service)), _sessionsColl(std::move(collection)), - _transactionReaper(std::move(transactionReaper)) { + _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) { _stats.setLastSessionsCollectionJobTimestamp(now()); _stats.setLastTransactionReaperJobTimestamp(now()); @@ -73,26 +71,22 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl( [this](Client* client) { _periodicRefresh(client); }, Milliseconds(logicalSessionRefreshMillis)}); - if (_transactionReaper) { - _service->scheduleJob({"LogicalSessionCacheReap", - [this](Client* client) { _periodicReap(client); }, - Milliseconds(logicalSessionRefreshMillis)}); - } + _service->scheduleJob({"LogicalSessionCacheReap", + [this](Client* client) { _periodicReap(client); }, + Milliseconds(logicalSessionRefreshMillis)}); } } LogicalSessionCacheImpl::~LogicalSessionCacheImpl() { - try { - _service->join(); - } catch (...) { - // If we failed to join we might still be running a background thread, log but swallow the - // error since there is no good way to recover - severe() << "Failed to join background service thread"; - } + joinOnShutDown(); +} + +void LogicalSessionCacheImpl::joinOnShutDown() { + _service->join(); } Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); auto it = _activeSessions.find(lsid); if (it == _activeSessions.end()) { return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; @@ -167,7 +161,7 @@ Date_t LogicalSessionCacheImpl::now() { } size_t LogicalSessionCacheImpl::size() { - stdx::lock_guard<stdx::mutex> lock(_cacheMutex); + stdx::lock_guard<stdx::mutex> lock(_mutex); return _activeSessions.size(); } @@ -189,13 +183,9 @@ void LogicalSessionCacheImpl::_periodicReap(Client* client) { } Status LogicalSessionCacheImpl::_reap(Client* client) { - if (!_transactionReaper) { - return Status::OK(); - } - // Take the lock to update some stats. { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); // Clear the last set of stats for our new run. _stats.setLastTransactionReaperJobDurationMillis(0); @@ -206,19 +196,19 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { _stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1); } - int numReaped = 0; + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto* const opCtx = [&] { + if (client->getOperationContext()) { + return client->getOperationContext(); + } - try { - boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; - auto* const opCtx = [&client, &uniqueCtx] { - if (client->getOperationContext()) { - return client->getOperationContext(); - } + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); - uniqueCtx.emplace(client->makeOperationContext()); - return uniqueCtx->get(); - }(); + int numReaped = 0; + try { ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); }); auto existsStatus = _sessionsColl->checkSessionsCollectionExists(opCtx); @@ -236,20 +226,23 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { return Status::OK(); } - stdx::lock_guard<stdx::mutex> lk(_reaperMutex); - numReaped = _transactionReaper->reap(opCtx); - } catch (...) { + numReaped = + _reapSessionsOlderThanFn(opCtx, + *_sessionsColl, + opCtx->getServiceContext()->getFastClockSource()->now() - + Minutes(gTransactionRecordMinimumLifetimeMinutes)); + } catch (const DBException& ex) { { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); _stats.setLastTransactionReaperJobDurationMillis(millis.count()); } - return exceptionToStatus(); + return ex.toStatus(); } { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); _stats.setLastTransactionReaperJobDurationMillis(millis.count()); _stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped); @@ -261,7 +254,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { void LogicalSessionCacheImpl::_refresh(Client* client) { // Stats for serverStatus: { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); // Clear the refresh-related stats with the beginning of our run. _stats.setLastSessionsCollectionJobDurationMillis(0); @@ -276,7 +269,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { // This will finish timing _refresh for our stats no matter when we return. const auto timeRefreshJob = makeGuard([this] { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp(); _stats.setLastSessionsCollectionJobDurationMillis(millis.count()); }); @@ -308,7 +301,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { { using std::swap; - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); swap(explicitlyEndingSessions, _endingSessions); swap(activeSessions, _activeSessions); } @@ -317,7 +310,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { // swapped out of LogicalSessionCache, and merges in any records that had been added since we // swapped them out. auto backSwap = [this](auto& member, auto& temp) { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); using std::swap; swap(member, temp); for (const auto& it : temp) { @@ -333,9 +326,8 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { activeSessions.erase(lsid); } - // refresh all recently active sessions as well as for sessions attached to running ops - - LogicalSessionRecordSet activeSessionRecords{}; + // Refresh all recently active sessions as well as for sessions attached to running ops + LogicalSessionRecordSet activeSessionRecords; auto runningOpSessions = _service->getActiveOpSessions(); @@ -354,7 +346,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords)); activeSessionsBackSwapper.dismiss(); { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size()); } @@ -362,7 +354,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions)); explicitlyEndingBackSwaper.dismiss(); { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size()); } @@ -375,7 +367,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { // Exclude sessions added to _activeSessions from the openCursorSession to avoid race between // killing cursors on the removed sessions and creating sessions. { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); for (const auto& it : _activeSessions) { auto newSessionIt = openCursorSessions.find(it.first); @@ -405,33 +397,34 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { SessionKiller::Matcher matcher(std::move(patterns)); auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)); { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _stats.setLastSessionsCollectionJobCursorsClosed(killRes.second); } } void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _endingSessions.insert(begin(sessions), end(sessions)); } LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _stats.setActiveSessionsCount(_activeSessions.size()); return _stats; } Status LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (_activeSessions.size() >= static_cast<size_t>(maxSessions)) { return {ErrorCodes::TooManyLogicalSessions, "cannot add session into the cache"}; } + _activeSessions.insert(std::make_pair(record.getId(), record)); return Status::OK(); } std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); std::vector<LogicalSessionId> ret; ret.reserve(_activeSessions.size()); for (const auto& id : _activeSessions) { @@ -442,7 +435,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const { std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( const std::vector<SHA256Block>& userDigests) const { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); std::vector<LogicalSessionId> ret; for (const auto& it : _activeSessions) { if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) != @@ -455,11 +448,12 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached( const LogicalSessionId& id) const { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); const auto it = _activeSessions.find(id); if (it == _activeSessions.end()) { return boost::none; } return it->second; } + } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h index b8c4001ace6..96c7f80d717 100644 --- a/src/mongo/db/logical_session_cache_impl.h +++ b/src/mongo/db/logical_session_cache_impl.h @@ -30,23 +30,12 @@ #pragma once #include "mongo/db/logical_session_cache.h" -#include "mongo/db/logical_session_cache_impl_gen.h" -#include "mongo/db/logical_session_id.h" -#include "mongo/db/refresh_sessions_gen.h" #include "mongo/db/service_liaison.h" #include "mongo/db/sessions_collection.h" -#include "mongo/db/time_proof_service.h" -#include "mongo/db/transaction_reaper.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/stdx/thread.h" -#include "mongo/util/lru_cache.h" +#include "mongo/util/functional.h" namespace mongo { -class Client; -class OperationContext; -class ServiceContext; - /** * A thread-safe cache structure for logical session records. * @@ -63,18 +52,20 @@ class ServiceContext; */ class LogicalSessionCacheImpl final : public LogicalSessionCache { public: - /** - * Construct a new session cache. - */ + using ReapSessionsOlderThanFn = + unique_function<int(OperationContext*, SessionsCollection&, Date_t)>; + LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service, std::shared_ptr<SessionsCollection> collection, - std::shared_ptr<TransactionReaper> transactionReaper); + ReapSessionsOlderThanFn reapSessionsOlderThanFn); LogicalSessionCacheImpl(const LogicalSessionCacheImpl&) = delete; LogicalSessionCacheImpl& operator=(const LogicalSessionCacheImpl&) = delete; ~LogicalSessionCacheImpl(); + void joinOnShutDown() override; + Status promote(LogicalSessionId lsid) override; Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override; @@ -126,23 +117,19 @@ private: */ Status _addToCache(LogicalSessionRecord record); - // This value is only modified under the lock, and is modified - // automatically by the background jobs. - LogicalSessionCacheStats _stats; - std::unique_ptr<ServiceLiaison> _service; std::shared_ptr<SessionsCollection> _sessionsColl; + ReapSessionsOlderThanFn _reapSessionsOlderThanFn; - mutable stdx::mutex _reaperMutex; - std::shared_ptr<TransactionReaper> _transactionReaper; - - mutable stdx::mutex _cacheMutex; + mutable stdx::mutex _mutex; LogicalSessionIdMap<LogicalSessionRecord> _activeSessions; LogicalSessionIdSet _endingSessions; - Date_t lastRefreshTime; + Date_t _lastRefreshTime; + + LogicalSessionCacheStats _stats; }; } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h index a81f6ee8299..8b5bb312102 100644 --- a/src/mongo/db/logical_session_cache_noop.h +++ b/src/mongo/db/logical_session_cache_noop.h @@ -42,6 +42,8 @@ class ServiceContext; */ class LogicalSessionCacheNoop : public LogicalSessionCache { public: + void joinOnShutDown() override {} + Status promote(LogicalSessionId lsid) override { return Status::OK(); } diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp index e7b61b664b0..a5cc6e8eed0 100644 --- a/src/mongo/db/logical_session_cache_test.cpp +++ b/src/mongo/db/logical_session_cache_test.cpp @@ -79,7 +79,11 @@ public: auto mockService = stdx::make_unique<MockServiceLiaison>(_service); auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions); _cache = stdx::make_unique<LogicalSessionCacheImpl>( - std::move(mockService), std::move(mockSessions), nullptr /* reaper */); + std::move(mockService), + std::move(mockSessions), + [](OperationContext*, SessionsCollection&, Date_t) { + return 0; /* No op*/ + }); } void waitUntilRefreshScheduled() { diff --git a/src/mongo/db/logical_session_id_test.cpp b/src/mongo/db/logical_session_id_test.cpp index d994458332f..160e718201d 100644 --- a/src/mongo/db/logical_session_id_test.cpp +++ b/src/mongo/db/logical_session_id_test.cpp @@ -92,7 +92,11 @@ public: std::make_shared<MockSessionsCollectionImpl>()); auto localLogicalSessionCache = std::make_unique<LogicalSessionCacheImpl>( - std::move(localServiceLiaison), std::move(localSessionsCollection), nullptr); + std::move(localServiceLiaison), + std::move(localSessionsCollection), + [](OperationContext*, SessionsCollection&, Date_t) { + return 0; /* No op*/ + }); LogicalSessionCache::set(getServiceContext(), std::move(localLogicalSessionCache)); } diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript index 0fc1b35ee08..54050af3feb 100644 --- a/src/mongo/db/s/SConscript +++ b/src/mongo/db/s/SConscript @@ -278,7 +278,7 @@ env.Library( ) env.Library( - target='commands_db_s', + target='sharding_commands_d', source=[ 'add_shard_cmd.cpp', 'check_sharding_index_command.cpp', @@ -372,20 +372,25 @@ env.CppUnitTest( 'migration_chunk_cloner_source_legacy_test.cpp', 'migration_destination_manager_test.cpp', 'namespace_metadata_change_notifications_test.cpp', + 'session_catalog_migration_destination_test.cpp', + 'session_catalog_migration_source_test.cpp', 'shard_metadata_util_test.cpp', 'shard_server_catalog_cache_loader_test.cpp', 'sharding_initialization_mongod_test.cpp', 'sharding_initialization_op_observer_test.cpp', 'split_vector_test.cpp', ], - LIBDEPS=[ - 'sharding_runtime_d', + LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/db/auth/authmocks', + '$BUILD_DIR/mongo/db/logical_session_cache_impl', + '$BUILD_DIR/mongo/db/ops/write_ops_exec', '$BUILD_DIR/mongo/db/query/query_request', + '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', '$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', '$BUILD_DIR/mongo/s/shard_server_test_fixture', + 'sharding_runtime_d', ], ) @@ -427,32 +432,6 @@ env.CppUnitTest( ) env.CppUnitTest( - target='session_catalog_migration_source_test', - source=[ - 'session_catalog_migration_source_test.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/auth/authmocks', - '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture', - 'sharding_runtime_d', - ] -) - -env.CppUnitTest( - target='session_catalog_migration_destination_test', - source=[ - 'session_catalog_migration_destination_test.cpp', - ], - LIBDEPS=[ - '$BUILD_DIR/mongo/db/auth/authmocks', - '$BUILD_DIR/mongo/db/ops/write_ops_exec', - '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock', - '$BUILD_DIR/mongo/s/shard_server_test_fixture', - 'sharding_runtime_d', - ] -) - -env.CppUnitTest( target='sharding_catalog_manager_test', source=[ 'config/initial_split_policy_test.cpp', diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index fda30fb70f3..7ffc762201e 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -212,7 +212,7 @@ SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const N } Status SessionsCollection::doRefresh(const NamespaceString& ns, - const LogicalSessionRecordSet& sessions, + const std::vector<LogicalSessionRecord>& sessions, SendBatchFn send) { auto init = [ns](BSONObjBuilder* batch) { batch->append("update", ns.coll()); @@ -230,7 +230,7 @@ Status SessionsCollection::doRefresh(const NamespaceString& ns, } Status SessionsCollection::doRemove(const NamespaceString& ns, - const LogicalSessionIdSet& sessions, + const std::vector<LogicalSessionId>& sessions, SendBatchFn send) { auto init = [ns](BSONObjBuilder* batch) { batch->append("delete", ns.coll()); @@ -245,16 +245,15 @@ Status SessionsCollection::doRemove(const NamespaceString& ns, return runBulkCmd("deletes", init, add, send, sessions); } -StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const NamespaceString& ns, - const LogicalSessionIdSet& sessions, - FindBatchFn send) { +StatusWith<LogicalSessionIdSet> SessionsCollection::doFindRemoved( + const NamespaceString& ns, const std::vector<LogicalSessionId>& sessions, FindBatchFn send) { auto makeT = [] { return std::vector<LogicalSessionId>{}; }; auto add = [](std::vector<LogicalSessionId>& batch, const LogicalSessionId& record) { batch.push_back(record); }; - LogicalSessionIdSet removed = sessions; + LogicalSessionIdSet removed{sessions.begin(), sessions.end()}; auto wrappedSend = [&](BSONObj batch) { auto swBatchResult = send(batch); diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h index b8bdfe83ed2..64e78c8a476 100644 --- a/src/mongo/db/sessions_collection.h +++ b/src/mongo/db/sessions_collection.h @@ -102,31 +102,34 @@ protected: * Makes a send function for the given client. */ using SendBatchFn = stdx::function<Status(BSONObj batch)>; - SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client); - SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client); + static SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client); + static SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client); + using FindBatchFn = stdx::function<StatusWith<BSONObj>(BSONObj batch)>; - FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client); + static FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client); /** * Formats and sends batches of refreshes for the given set of sessions. */ Status doRefresh(const NamespaceString& ns, - const LogicalSessionRecordSet& sessions, + const std::vector<LogicalSessionRecord>& sessions, SendBatchFn send); /** * Formats and sends batches of deletes for the given set of sessions. */ Status doRemove(const NamespaceString& ns, - const LogicalSessionIdSet& sessions, + const std::vector<LogicalSessionId>& sessions, SendBatchFn send); /** - * Formats and sends batches of fetches for the given set of sessions. + * Returns those lsids from the input 'sessions' array which are not present in the sessions + * collection (essentially performs an inner join of 'sessions' against the sessions + * collection). */ - StatusWith<LogicalSessionIdSet> doFetch(const NamespaceString& ns, - const LogicalSessionIdSet& sessions, - FindBatchFn send); + StatusWith<LogicalSessionIdSet> doFindRemoved(const NamespaceString& ns, + const std::vector<LogicalSessionId>& sessions, + FindBatchFn send); }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp index 7abc9dfd1ae..0069d3372f3 100644 --- a/src/mongo/db/sessions_collection_rs.cpp +++ b/src/mongo/db/sessions_collection_rs.cpp @@ -207,18 +207,20 @@ Status SessionsCollectionRS::checkSessionsCollectionExists(OperationContext* opC Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, const LogicalSessionRecordSet& sessions) { + const std::vector<LogicalSessionRecord> sessionsVector(sessions.begin(), sessions.end()); + return dispatch(NamespaceString::kLogicalSessionsNamespace, opCtx, [&] { DBDirectClient client(opCtx); return doRefresh(NamespaceString::kLogicalSessionsNamespace, - sessions, + sessionsVector, makeSendFnForBatchWrite( NamespaceString::kLogicalSessionsNamespace, &client)); }, [&](DBClientBase* client) { return doRefresh(NamespaceString::kLogicalSessionsNamespace, - sessions, + sessionsVector, makeSendFnForBatchWrite( NamespaceString::kLogicalSessionsNamespace, client)); }); @@ -226,18 +228,20 @@ Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) { + const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end()); + return dispatch(NamespaceString::kLogicalSessionsNamespace, opCtx, [&] { DBDirectClient client(opCtx); return doRemove(NamespaceString::kLogicalSessionsNamespace, - sessions, + sessionsVector, makeSendFnForBatchWrite( NamespaceString::kLogicalSessionsNamespace, &client)); }, [&](DBClientBase* client) { return doRemove(NamespaceString::kLogicalSessionsNamespace, - sessions, + sessionsVector, makeSendFnForBatchWrite( NamespaceString::kLogicalSessionsNamespace, client)); }); @@ -245,21 +249,24 @@ Status SessionsCollectionRS::removeRecords(OperationContext* opCtx, StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) { - return dispatch(NamespaceString::kLogicalSessionsNamespace, - opCtx, - [&] { - DBDirectClient client(opCtx); - return doFetch(NamespaceString::kLogicalSessionsNamespace, - sessions, - makeFindFnForCommand( - NamespaceString::kLogicalSessionsNamespace, &client)); - }, - [&](DBClientBase* client) { - return doFetch(NamespaceString::kLogicalSessionsNamespace, - sessions, - makeFindFnForCommand( - NamespaceString::kLogicalSessionsNamespace, client)); - }); + const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end()); + + return dispatch( + NamespaceString::kLogicalSessionsNamespace, + opCtx, + [&] { + DBDirectClient client(opCtx); + return doFindRemoved( + NamespaceString::kLogicalSessionsNamespace, + sessionsVector, + makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client)); + }, + [&](DBClientBase* client) { + return doFindRemoved( + NamespaceString::kLogicalSessionsNamespace, + sessionsVector, + makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, client)); + }); } } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index cf1be44431c..e65ddd0dab5 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -40,6 +40,8 @@ #include "mongo/rpc/op_msg.h" #include "mongo/rpc/op_msg_rpc_impls.h" #include "mongo/s/catalog_cache.h" +#include "mongo/s/client/shard.h" +#include "mongo/s/client/shard_registry.h" #include "mongo/s/grid.h" #include "mongo/s/query/cluster_find.h" #include "mongo/s/write_ops/batch_write_exec.h" @@ -78,6 +80,60 @@ Status SessionsCollectionSharded::_checkCacheForSessionsCollection(OperationCont return {ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist"}; } +std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwningShard( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) { + auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + opCtx, NamespaceString::kLogicalSessionsNamespace)); + auto cm = routingInfo.cm(); + + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace + << " is not sharded", + cm); + + std::multimap<ShardId, LogicalSessionId> sessionIdsByOwningShard; + for (const auto& session : sessions) { + sessionIdsByOwningShard.emplace( + cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), + session); + } + + std::vector<LogicalSessionId> sessionIdsGroupedByShard; + sessionIdsGroupedByShard.reserve(sessions.size()); + for (auto& session : sessionIdsByOwningShard) { + sessionIdsGroupedByShard.emplace_back(std::move(session.second)); + } + + return sessionIdsGroupedByShard; +} + +std::vector<LogicalSessionRecord> SessionsCollectionSharded::_groupSessionRecordsByOwningShard( + OperationContext* opCtx, const LogicalSessionRecordSet& sessions) { + auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo( + opCtx, NamespaceString::kLogicalSessionsNamespace)); + auto cm = routingInfo.cm(); + + uassert(ErrorCodes::NamespaceNotSharded, + str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace + << " is not sharded", + cm); + + std::multimap<ShardId, LogicalSessionRecord> sessionsByOwningShard; + for (const auto& session : sessions) { + sessionsByOwningShard.emplace( + cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(), + session); + } + + std::vector<LogicalSessionRecord> sessionRecordsGroupedByShard; + sessionRecordsGroupedByShard.reserve(sessions.size()); + for (auto& session : sessionsByOwningShard) { + sessionRecordsGroupedByShard.emplace_back(std::move(session.second)); + } + + return sessionRecordsGroupedByShard; +} + Status SessionsCollectionSharded::setupSessionsCollection(OperationContext* opCtx) { return checkSessionsCollectionExists(opCtx); } @@ -100,7 +156,9 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, return response.toStatus(); }; - return doRefresh(NamespaceString::kLogicalSessionsNamespace, sessions, send); + return doRefresh(NamespaceString::kLogicalSessionsNamespace, + _groupSessionRecordsByOwningShard(opCtx, sessions), + send); } Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, @@ -117,7 +175,9 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, return response.toStatus(); }; - return doRemove(NamespaceString::kLogicalSessionsNamespace, sessions, send); + return doRemove(NamespaceString::kLogicalSessionsNamespace, + _groupSessionIdsByOwningShard(opCtx, sessions), + send); } StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( @@ -163,7 +223,9 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions( return replyBuilder.releaseBody(); }; - return doFetch(NamespaceString::kLogicalSessionsNamespace, sessions, send); + return doFindRemoved(NamespaceString::kLogicalSessionsNamespace, + _groupSessionIdsByOwningShard(opCtx, sessions), + send); } } // namespace mongo diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h index bd377d369bd..95aba591cb7 100644 --- a/src/mongo/db/sessions_collection_sharded.h +++ b/src/mongo/db/sessions_collection_sharded.h @@ -73,6 +73,20 @@ public: protected: Status _checkCacheForSessionsCollection(OperationContext* opCtx); + + /** + * These two methods use the sharding routing metadata to do a best effort attempt at grouping + * the specified set of sessions by the shards, which have the records for these sessions. This + * is done as an attempt to avoid broadcast queries. + * + * The reason it is 'best effort' is because it makes no attempt at checking whether the routing + * table is up-to-date and just picks up whatever was most recently fetched from the config + * server, which could be stale. + */ + std::vector<LogicalSessionId> _groupSessionIdsByOwningShard( + OperationContext* opCtx, const LogicalSessionIdSet& sessions); + std::vector<LogicalSessionRecord> _groupSessionRecordsByOwningShard( + OperationContext* opCtx, const LogicalSessionRecordSet& sessions); }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp index 32e93491fbf..25ae4db3dfa 100644 --- a/src/mongo/db/sessions_collection_standalone.cpp +++ b/src/mongo/db/sessions_collection_standalone.cpp @@ -100,7 +100,7 @@ Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx, const LogicalSessionRecordSet& sessions) { DBDirectClient client(opCtx); return doRefresh(NamespaceString::kLogicalSessionsNamespace, - sessions, + std::vector(sessions.begin(), sessions.end()), makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client)); } @@ -108,16 +108,16 @@ Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) { DBDirectClient client(opCtx); return doRemove(NamespaceString::kLogicalSessionsNamespace, - sessions, + std::vector(sessions.begin(), sessions.end()), makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client)); } StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) { DBDirectClient client(opCtx); - return doFetch(NamespaceString::kLogicalSessionsNamespace, - sessions, - makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client)); + return doFindRemoved(NamespaceString::kLogicalSessionsNamespace, + std::vector(sessions.begin(), sessions.end()), + makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client)); } } // namespace mongo diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp deleted file mode 100644 index e1ac967ef59..00000000000 --- a/src/mongo/db/transaction_reaper.cpp +++ /dev/null @@ -1,289 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/db/transaction_reaper.h" - -#include "mongo/bson/bsonmisc.h" -#include "mongo/db/dbdirectclient.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/ops/write_ops.h" -#include "mongo/db/repl/replication_coordinator.h" -#include "mongo/db/session_txn_record_gen.h" -#include "mongo/db/sessions_collection.h" -#include "mongo/db/transaction_reaper_gen.h" -#include "mongo/platform/atomic_word.h" -#include "mongo/s/catalog_cache.h" -#include "mongo/s/client/shard.h" -#include "mongo/s/client/shard_registry.h" -#include "mongo/s/grid.h" -#include "mongo/s/write_ops/batched_command_response.h" -#include "mongo/util/scopeguard.h" - -namespace mongo { -namespace { - -const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1); -const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1); -const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName; - -/** - * Makes the query we'll use to scan the transactions table. - * - * Scans for records older than the minimum lifetime and uses a sort to walk the index and attempt - * to pull records likely to be on the same chunks (because they sort near each other). - */ -Query makeQuery(Date_t now) { - const Date_t possiblyExpired(now - Minutes(gTransactionRecordMinimumLifetimeMinutes)); - Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired)); - query.sort(kSortById); - return query; -} - -/** - * Our impl is templatized on a type which handles the lsids we see. It provides the top level - * scaffolding for figuring out if we're the primary node responsible for the transaction table and - * invoking the handler. - * - * The handler here will see all of the possibly expired txn ids in the transaction table and will - * have a lifetime associated with a single call to reap. - */ -template <typename Handler> -class TransactionReaperImpl final : public TransactionReaper { -public: - TransactionReaperImpl(std::shared_ptr<SessionsCollection> collection) - : _collection(std::move(collection)) {} - - int reap(OperationContext* opCtx) override { - Handler handler(opCtx, *_collection); - if (!handler.initialize()) { - return 0; - } - - // Make a best-effort attempt to only reap when the node is running as a primary - const auto coord = mongo::repl::ReplicationCoordinator::get(opCtx); - if (!coord->canAcceptWritesForDatabase_UNSAFE( - opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) { - return 0; - } - - DBDirectClient client(opCtx); - - // Fill all stale config.transactions entries - auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now()); - auto cursor = client.query( - NamespaceString::kSessionTransactionsTableNamespace, query, 0, 0, &kIdProjection); - - while (cursor->more()) { - auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( - "TransactionSession"_sd, cursor->next()); - - handler.handleLsid(transactionSession.get_id()); - } - - // Before the handler goes out of scope, flush its last batch to disk and collect stats. - return handler.finalize(); - } - -private: - std::shared_ptr<SessionsCollection> _collection; -}; - -/** - * Removes the specified set of session ids from the persistent sessions collection and returns the - * number of sessions actually removed. - */ -int removeSessionsTransactionRecords(OperationContext* opCtx, - SessionsCollection& sessionsCollection, - const LogicalSessionIdSet& sessionIdsToRemove) { - if (sessionIdsToRemove.empty()) { - return 0; - } - - // From the passed-in sessions, find the ones which are actually expired/removed - auto expiredSessionIds = - uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove)); - - DBDirectClient client(opCtx); - int numDeleted = 0; - - for (auto it = expiredSessionIds.begin(); it != expiredSessionIds.end();) { - write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace); - deleteOp.setWriteCommandBase([] { - write_ops::WriteCommandBase base; - base.setOrdered(false); - return base; - }()); - deleteOp.setDeletes([&] { - // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object - // size limit - const int kMaxBatchSize = 10'000; - std::vector<write_ops::DeleteOpEntry> entries; - for (; it != expiredSessionIds.end() && entries.size() < kMaxBatchSize; ++it) { - entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << it->toBSON()), - false /* multi = false */); - } - return entries; - }()); - - BSONObj result; - client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(), - deleteOp.toBSON({}), - result); - - BatchedCommandResponse response; - std::string errmsg; - uassert(ErrorCodes::FailedToParse, - str::stream() << "Failed to parse response " << result, - response.parseBSON(result, &errmsg)); - uassertStatusOK(response.getTopLevelStatus()); - - numDeleted += response.getN(); - } - - return numDeleted; -} - -/** - * The repl impl is simple, just pass along to the sessions collection for checking ids locally - */ -class ReplHandler { -public: - ReplHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection) - : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {} - - bool initialize() { - return true; - } - - void handleLsid(const LogicalSessionId& lsid) { - _batch.insert(lsid); - - if (_batch.size() >= write_ops::kMaxWriteBatchSize) { - _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch); - _batch.clear(); - } - } - - int finalize() { - invariant(!_finalized); - _finalized = true; - - _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch); - return _numReaped; - } - -private: - OperationContext* const _opCtx; - SessionsCollection& _sessionsCollection; - - LogicalSessionIdSet _batch; - - int _numReaped{0}; - - bool _finalized{false}; -}; - -/** - * The sharding impl is a little fancier. Try to bucket by shard id, to avoid doing repeated small - * scans. - */ -class ShardedHandler { -public: - ShardedHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection) - : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {} - - // Returns false if the sessions collection is not set up. - bool initialize() { - auto routingInfo = - uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo( - _opCtx, NamespaceString::kLogicalSessionsNamespace)); - _cm = routingInfo.cm(); - return !!_cm; - } - - void handleLsid(const LogicalSessionId& lsid) { - invariant(_cm); - - // This code attempts to group requests to 'removeSessionsTransactionRecords' to contain - // batches of lsids, which only fall on the same shard, so that the query to check whether - // they are alive doesn't need to do cross-shard scatter/gather queries - const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON()); - const auto& shardId = chunk.getShardId(); - - auto& lsids = _shards[shardId]; - lsids.insert(lsid); - - if (lsids.size() >= write_ops::kMaxWriteBatchSize) { - _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, lsids); - _shards.erase(shardId); - } - } - - int finalize() { - invariant(!_finalized); - _finalized = true; - - for (const auto& pair : _shards) { - _numReaped += - removeSessionsTransactionRecords(_opCtx, _sessionsCollection, pair.second); - } - - return _numReaped; - } - -private: - OperationContext* const _opCtx; - SessionsCollection& _sessionsCollection; - - std::shared_ptr<ChunkManager> _cm; - - stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards; - int _numReaped{0}; - - bool _finalized{false}; -}; - -} // namespace - -std::unique_ptr<TransactionReaper> TransactionReaper::make( - Type type, std::shared_ptr<SessionsCollection> collection) { - switch (type) { - case Type::kReplicaSet: - return stdx::make_unique<TransactionReaperImpl<ReplHandler>>(std::move(collection)); - case Type::kSharded: - return stdx::make_unique<TransactionReaperImpl<ShardedHandler>>(std::move(collection)); - } - MONGO_UNREACHABLE; -} - -TransactionReaper::~TransactionReaper() = default; - -} // namespace mongo diff --git a/src/mongo/db/transaction_reaper.h b/src/mongo/db/transaction_reaper.h deleted file mode 100644 index 4c3c252e634..00000000000 --- a/src/mongo/db/transaction_reaper.h +++ /dev/null @@ -1,63 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#pragma once - -#include <memory> - -namespace mongo { - -class ServiceContext; -class SessionsCollection; -class OperationContext; - -/** - * TransactionReaper is responsible for scanning the transaction table, checking if sessions are - * still alive and deleting the transaction records if their sessions have expired. - */ -class TransactionReaper { -public: - enum class Type { - kReplicaSet, - kSharded, - }; - - virtual ~TransactionReaper() = 0; - - virtual int reap(OperationContext* OperationContext) = 0; - - /** - * The implementation of the sessions collections is different in replica sets versus sharded - * clusters, so we have a factory to pick the right impl. - */ - static std::unique_ptr<TransactionReaper> make(Type type, - std::shared_ptr<SessionsCollection> collection); -}; - -} // namespace mongo diff --git a/src/mongo/db/transaction_reaper.idl b/src/mongo/db/transaction_reaper.idl deleted file mode 100644 index 7146d869a3f..00000000000 --- a/src/mongo/db/transaction_reaper.idl +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (C) 2019-present MongoDB, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the Server Side Public License, version 1, -# as published by MongoDB, Inc. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Server Side Public License for more details. -# -# You should have received a copy of the Server Side Public License -# along with this program. If not, see -# <http://www.mongodb.com/licensing/server-side-public-license>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. -# - -global: - cpp_namespace: "mongo" - -server_parameters: - TransactionRecordMinimumLifetimeMinutes: - # The minimum lifetime for a transaction record is how long it has to have lived on the server - # before we'll consider it for cleanup. This is effectively the window for how long it is - # permissible for a mongos to hang before we're willing to accept a failure of the retryable write - # subsystem. - # - # Specifically, we imagine that a client connects to one mongos on a session and performs a - # retryable write. That mongos hangs. Then the client connects to a new mongos on the same - # session and successfully executes its write. After a day passes, the session will time out, - # cleaning up the retryable write. Then the original mongos wakes up, vivifies the session and - # executes the write (because all records of the session + transaction have been deleted). - # - # So the write is performed twice, which is unavoidable without losing session vivification and/or - # requiring synchronized clocks all the way out to the client. In lieu of that we provide a weaker - # guarantee after the minimum transaction lifetime. - description: 'The minimum lifetime for a transaction record.' - set_at: startup - cpp_vartype: int - cpp_varname: gTransactionRecordMinimumLifetimeMinutes - default: 30 diff --git a/src/mongo/db/transaction_reaper_d.cpp b/src/mongo/db/transaction_reaper_d.cpp new file mode 100644 index 00000000000..1370c60c4f6 --- /dev/null +++ b/src/mongo/db/transaction_reaper_d.cpp @@ -0,0 +1,133 @@ +/** + * Copyright (C) 2019-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/transaction_reaper_d.h" + +#include "mongo/bson/bsonmisc.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/ops/write_ops.h" +#include "mongo/db/session_txn_record_gen.h" +#include "mongo/db/sessions_collection.h" +#include "mongo/s/write_ops/batched_command_response.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { +namespace { + +const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1); +const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1); +const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName; + +/** + * Removes the specified set of session ids from the persistent sessions collection and returns the + * number of sessions actually removed. + */ +int removeSessionsTransactionRecords(OperationContext* opCtx, + SessionsCollection& sessionsCollection, + const LogicalSessionIdSet& sessionIdsToRemove) { + if (sessionIdsToRemove.empty()) { + return 0; + } + + // From the passed-in sessions, find the ones which are actually expired/removed + auto expiredSessionIds = + uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove)); + + write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace); + deleteOp.setWriteCommandBase([] { + write_ops::WriteCommandBase base; + base.setOrdered(false); + return base; + }()); + deleteOp.setDeletes([&] { + std::vector<write_ops::DeleteOpEntry> entries; + for (auto it = expiredSessionIds.begin(); it != expiredSessionIds.end(); ++it) { + entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << it->toBSON()), + false /* multi = false */); + } + return entries; + }()); + + BSONObj result; + + DBDirectClient client(opCtx); + client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(), + deleteOp.toBSON({}), + result); + + BatchedCommandResponse response; + std::string errmsg; + uassert(ErrorCodes::FailedToParse, + str::stream() << "Failed to parse response " << result, + response.parseBSON(result, &errmsg)); + uassertStatusOK(response.getTopLevelStatus()); + + return response.getN(); +} + +} // namespace + +int TransactionReaperD::reapSessionsOlderThan(OperationContext* opCtx, + SessionsCollection& sessionsCollection, + Date_t possiblyExpired) { + // Scan for records older than the minimum lifetime and uses a sort to walk the '_id' index + DBDirectClient client(opCtx); + auto cursor = + client.query(NamespaceString::kSessionTransactionsTableNamespace, + Query(BSON(kLastWriteDateFieldName << LT << possiblyExpired)).sort(kSortById), + 0, + 0, + &kIdProjection); + + // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object + // size limit + LogicalSessionIdSet lsids; + const int kMaxBatchSize = 10'000; + + int numReaped = 0; + while (cursor->more()) { + auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse( + "TransactionSession"_sd, cursor->next()); + + lsids.insert(transactionSession.get_id()); + if (lsids.size() > kMaxBatchSize) { + numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids); + lsids.clear(); + } + } + + numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids); + + return numReaped; +} + +} // namespace mongo diff --git a/src/mongo/db/logical_session_cache_factory_mongos.h b/src/mongo/db/transaction_reaper_d.h index 739c633631d..057583b0ec9 100644 --- a/src/mongo/db/logical_session_cache_factory_mongos.h +++ b/src/mongo/db/transaction_reaper_d.h @@ -1,5 +1,5 @@ /** - * Copyright (C) 2018-present MongoDB, Inc. + * Copyright (C) 2019-present MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the Server Side Public License, version 1, @@ -29,13 +29,18 @@ #pragma once -#include <memory> - -#include "mongo/db/logical_session_cache.h" -#include "mongo/db/service_liaison.h" +#include "mongo/util/time_support.h" namespace mongo { -std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS(); +class SessionsCollection; +class OperationContext; + +class TransactionReaperD { +public: + static int reapSessionsOlderThan(OperationContext* OperationContext, + SessionsCollection& sessionsCollection, + Date_t possiblyExpired); +}; } // namespace mongo diff --git a/src/mongo/db/transaction_reaper_test.cpp b/src/mongo/db/transaction_reaper_d_test.cpp index 546ba58568a..165ed3597a0 100644 --- a/src/mongo/db/transaction_reaper_test.cpp +++ b/src/mongo/db/transaction_reaper_d_test.cpp @@ -34,7 +34,7 @@ #include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/sessions_collection_mock.h" -#include "mongo/db/transaction_reaper.h" +#include "mongo/db/transaction_reaper_d.h" #include "mongo/util/clock_source_mock.h" namespace mongo { @@ -61,15 +61,12 @@ protected: std::shared_ptr<MockSessionsCollectionImpl> _collectionMock{ std::make_shared<MockSessionsCollectionImpl>()}; - std::unique_ptr<TransactionReaper> _reaper{ - TransactionReaper::make(TransactionReaper::Type::kReplicaSet, - std::make_shared<MockSessionsCollection>(_collectionMock))}; + std::shared_ptr<SessionsCollection> _collection{ + std::make_shared<MockSessionsCollection>(_collectionMock)}; }; TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) { - _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now())); - _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now())); - + // Create some "old" sessions DBDirectClient client(_opCtx); SessionTxnRecord txn1( makeLogicalSessionIdForTest(), 100, repl::OpTime(Timestamp(100), 1), clock()->now()); @@ -79,8 +76,15 @@ TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) { client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), std::vector{txn1.toBSON(), txn2.toBSON()}); + // Add some "new" sessions to ensure they don't get reaped clock()->advance(Minutes{31}); - ASSERT_EQ(2, _reaper->reap(_opCtx)); + _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now())); + _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now())); + + + ASSERT_EQ(2, + TransactionReaperD::reapSessionsOlderThan( + _opCtx, *_collection, clock()->now() - Minutes{30})); } } // namespace diff --git a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp index ae9f55f6ae8..fa606b87c3f 100644 --- a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp +++ b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp @@ -31,8 +31,6 @@ #include "mongo/platform/basic.h" -#include <memory> - #include "mongo/embedded/logical_session_cache_factory_embedded.h" #include "mongo/db/logical_session_cache_impl.h" @@ -46,11 +44,14 @@ namespace mongo { std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheEmbedded() { auto liaison = std::make_unique<ServiceLiaisonMongod>(); - // Set up the logical session cache auto sessionsColl = std::make_shared<SessionsCollectionStandalone>(); return stdx::make_unique<LogicalSessionCacheImpl>( - std::move(liaison), std::move(sessionsColl), nullptr /* reaper */); + std::move(liaison), + std::move(sessionsColl), + [](OperationContext*, SessionsCollection&, Date_t) { + return 0; /* No op*/ + }); } } // namespace mongo diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript index 23a1e9a5019..88db8654426 100644 --- a/src/mongo/s/SConscript +++ b/src/mongo/s/SConscript @@ -106,7 +106,6 @@ env.Library( 'client/sharding_network_connection_hook.cpp', ], LIBDEPS=[ - '$BUILD_DIR/mongo/db/logical_session_cache_factory_mongos', '$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_impl', '$BUILD_DIR/mongo/s/catalog/replset_dist_lock_manager', '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl', diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp index 29c490ba8b0..6580faeb0c5 100644 --- a/src/mongo/s/server.cpp +++ b/src/mongo/s/server.cpp @@ -55,13 +55,15 @@ #include "mongo/db/lasterror.h" #include "mongo/db/log_process_details.h" #include "mongo/db/logical_clock.h" -#include "mongo/db/logical_session_cache_factory_mongos.h" +#include "mongo/db/logical_session_cache_impl.h" #include "mongo/db/logical_time_metadata_hook.h" #include "mongo/db/logical_time_validator.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_options.h" #include "mongo/db/service_context.h" +#include "mongo/db/service_liaison_mongos.h" #include "mongo/db/session_killer.h" +#include "mongo/db/sessions_collection_sharded.h" #include "mongo/db/startup_warnings_common.h" #include "mongo/db/wire_version.h" #include "mongo/executor/task_executor_pool.h" @@ -84,7 +86,6 @@ #include "mongo/s/query/cluster_cursor_manager.h" #include "mongo/s/service_entry_point_mongos.h" #include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" -#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h" #include "mongo/s/sharding_initialization.h" #include "mongo/s/sharding_uptime_reporter.h" #include "mongo/s/version_mongos.h" @@ -187,6 +188,9 @@ void cleanupTask(ServiceContext* serviceContext) { Client::initThread(getThreadName()); Client& client = cc(); + // Join the logical session cache before the transport layer + LogicalSessionCache::get(serviceContext)->joinOnShutDown(); + // Shutdown the TransportLayer so that new connections aren't accepted if (auto tl = serviceContext->getTransportLayer()) { log(LogComponent::kNetwork) << "shutdown: going to close all sockets..."; @@ -496,8 +500,13 @@ ExitCode runMongosServer(ServiceContext* serviceContext) { SessionKiller::set(serviceContext, std::make_shared<SessionKiller>(serviceContext, killSessionsRemote)); - // Set up the logical session cache - LogicalSessionCache::set(serviceContext, makeLogicalSessionCacheS()); + LogicalSessionCache::set(serviceContext, + stdx::make_unique<LogicalSessionCacheImpl>( + stdx::make_unique<ServiceLiaisonMongos>(), + stdx::make_unique<SessionsCollectionSharded>(), + [](OperationContext*, SessionsCollection&, Date_t) { + return 0; /* No op*/ + })); status = serviceContext->getServiceExecutor()->start(); if (!status.isOK()) { |