summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-09-27 16:54:44 +0000
committerevergreen <evergreen@mongodb.com>2019-09-27 16:54:44 +0000
commit2e165002e4b434e5713d8b7dff8d46151edff85d (patch)
treef942643289c9e7d2d7f5431f3f8da91317110304
parent5c5088b0a184ed6993735ffdb37de5025b57a456 (diff)
downloadmongo-2e165002e4b434e5713d8b7dff8d46151edff85d.tar.gz
SERVER-37837 Get rid of TransactionReaper
Part 1: This change gets rid of the TransactionReaper's usage of the ReplicationCoordinator for checking whether it is primary or not and makes the LogicalSessionCache joinable on shutdown. It also removes the TransactionReaper's grouping per-shard optimization and moves it all under SessionCollectionSharded. (cherry picked from commit 2791817876636c0cfd60d867f31c7a83cf3f18c1) Part 2: This change folds the TransactionReaper's single function to be part of the SessionCatalogs on MongoD and MongoS, which are the subsystems responsible for managing transactions. (cherry picked from commit 94f269a1c6053824c4dabc50e8c9219b80a6a1b5) (cherry picked from commit 7e6a80789cd74f9b533065f57afb5c9221eea1e7)
-rw-r--r--src/mongo/SConscript15
-rw-r--r--src/mongo/db/SConscript74
-rw-r--r--src/mongo/db/commands/SConscript1
-rw-r--r--src/mongo/db/commands/end_sessions_command.cpp12
-rw-r--r--src/mongo/db/db.cpp14
-rw-r--r--src/mongo/db/logical_session_cache.h6
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongod.cpp24
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongos.cpp53
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongos.h42
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp177
-rw-r--r--src/mongo/db/logical_session_cache_impl.h35
-rw-r--r--src/mongo/db/logical_session_cache_noop.h2
-rw-r--r--src/mongo/db/logical_session_cache_test.cpp10
-rw-r--r--src/mongo/db/logical_session_id_test.cpp6
-rw-r--r--src/mongo/db/namespace_string.cpp2
-rw-r--r--src/mongo/db/namespace_string.h3
-rw-r--r--src/mongo/db/s/SConscript36
-rw-r--r--src/mongo/db/session_catalog.cpp96
-rw-r--r--src/mongo/db/session_catalog.h11
-rw-r--r--src/mongo/db/session_catalog_reap_sessions_test.cpp (renamed from src/mongo/db/transaction_reaper_test.cpp)45
-rw-r--r--src/mongo/db/sessions_collection.cpp11
-rw-r--r--src/mongo/db/sessions_collection.h21
-rw-r--r--src/mongo/db/sessions_collection_rs.cpp68
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp68
-rw-r--r--src/mongo/db/sessions_collection_sharded.h14
-rw-r--r--src/mongo/db/sessions_collection_standalone.cpp21
-rw-r--r--src/mongo/db/transaction_reaper.cpp317
-rw-r--r--src/mongo/db/transaction_reaper.h64
-rw-r--r--src/mongo/dbtests/query_stage_update.cpp20
-rw-r--r--src/mongo/embedded/logical_session_cache_factory_embedded.cpp9
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/server.cpp17
32 files changed, 497 insertions, 798 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index 5cde5318eeb..5e5e71e617a 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -297,12 +297,12 @@ env.Library(
'base',
'db/auth/authmongod',
'db/catalog/health_log',
- 'db/commands/mongod',
'db/commands/mongod_fcv',
+ 'db/commands/mongod',
'db/commands/server_status_servers',
'db/dbdirectclient',
- 'db/ftdc/ftdc_mongod',
'db/free_mon/free_mon_mongod',
+ 'db/ftdc/ftdc_mongod',
'db/index_d',
'db/initialize_snmp',
'db/keys_collection_client_direct',
@@ -312,13 +312,15 @@ env.Library(
'db/mongodandmongos',
'db/periodic_runner_job_abort_expired_transactions',
'db/query_exec',
- 'db/repair_database',
'db/repair_database_and_check_version',
+ 'db/repair_database',
'db/repl/repl_set_commands',
'db/repl/storage_interface_impl',
+ 'db/repl/storage_interface',
'db/repl/topology_coordinator',
'db/s/balancer',
'db/s/op_observer_sharding_impl',
+ 'db/s/sharding_commands_d',
'db/s/sharding_runtime_d',
'db/serveronly',
'db/service_context_d',
@@ -410,14 +412,17 @@ mongos = env.Program(
] + env.WindowsResourceFile("s/server.rc"),
LIBDEPS=[
'db/auth/authmongos',
- 'db/commands/server_status',
'db/commands/server_status_core',
'db/commands/server_status_servers',
+ 'db/commands/server_status',
'db/curop',
'db/ftdc/ftdc_mongos',
+ 'db/logical_session_cache_impl',
'db/logical_time_metadata_hook',
'db/mongodandmongos',
'db/server_options',
+ 'db/service_liaison_mongos',
+ 'db/sessions_collection_sharded',
'db/startup_warnings_common',
'db/stats/counters',
's/commands/cluster_commands',
@@ -425,9 +430,9 @@ mongos = env.Program(
's/committed_optime_metadata_hook',
's/coreshard',
's/is_mongos',
+ 's/query/cluster_cursor_cleanup_job',
's/sharding_egress_metadata_hook_for_mongos',
's/sharding_initialization',
- 's/query/cluster_cursor_cleanup_job',
'transport/service_entry_point',
'transport/transport_layer_manager',
'util/clock_sources',
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index b7ba27da87c..bf83bcda5a3 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1051,10 +1051,6 @@ env.Library(
'instance.cpp'
],
LIBDEPS=[
- "$BUILD_DIR/mongo/db/bson/dotted_path_support",
- "$BUILD_DIR/mongo/db/logical_time_metadata_hook",
- "$BUILD_DIR/mongo/db/storage/mmap_v1/file_allocator",
- "$BUILD_DIR/mongo/db/ttl_collection_cache",
"$BUILD_DIR/mongo/executor/network_interface_factory",
"$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl",
"$BUILD_DIR/mongo/scripting/scripting_server",
@@ -1063,6 +1059,7 @@ env.Library(
"$BUILD_DIR/mongo/util/net/network",
"$BUILD_DIR/third_party/shim_snappy",
"background",
+ "bson/dotted_path_support",
"catalog/catalog_impl",
"catalog/collection_options",
"catalog/document_validation",
@@ -1072,18 +1069,20 @@ env.Library(
"common",
"concurrency/lock_manager",
"concurrency/write_conflict_exception",
- "curop",
"curop_metrics",
+ "curop",
"db_raii",
"dbdirectclient",
"dbhelpers",
"exec/working_set",
"ftdc/ftdc_mongod",
"fts/ftsmongod",
+ "index_d",
"index/index_access_method",
"index/index_descriptor",
- "index_d",
"introspect",
+ "keys_collection_client_direct",
+ "logical_time_metadata_hook",
"matcher/expressions_mongod_only",
"ops/write_ops_parsers",
"pipeline/aggregation",
@@ -1091,21 +1090,21 @@ env.Library(
"query_exec",
"repair_database",
"repl/bgsync",
+ "repl/oplog_application",
"repl/oplog_buffer_blocking_queue",
"repl/oplog_buffer_collection",
"repl/oplog_buffer_proxy",
- "repl/repl_coordinator_interface",
"repl/repl_coordinator_impl",
+ "repl/repl_coordinator_interface",
"repl/repl_settings",
"repl/rs_rollback",
"repl/rslog",
"repl/serveronly_repl",
"repl/storage_interface",
- "repl/oplog_application",
"repl/topology_coordinator",
"rw_concern_d",
- "s/commands_db_s",
"s/op_observer_sharding_impl",
+ "s/sharding_commands_d",
"s/sharding_runtime_d",
"startup_warnings_mongod",
"stats/counters",
@@ -1113,6 +1112,7 @@ env.Library(
"stats/top",
"storage/devnull/storage_devnull",
"storage/ephemeral_for_test/storage_ephemeral_for_test",
+ "storage/mmap_v1/file_allocator",
"storage/mmap_v1/mmap",
"storage/mmap_v1/storage_mmapv1",
"storage/storage_engine_lock_file",
@@ -1120,11 +1120,11 @@ env.Library(
"storage/storage_init_d",
"storage/storage_options",
"storage/wiredtiger/storage_wiredtiger" if wiredtiger else [],
+ "ttl_collection_cache",
"ttl_d",
- "update/update_driver",
"update_index_data",
+ "update/update_driver",
"views/views_mongod",
- 'keys_collection_client_direct',
],
)
@@ -1360,7 +1360,6 @@ env.Library(
'initialize_operation_session_info.cpp',
'logical_session_cache_impl.cpp',
'logical_session_server_status_section.cpp',
- env.Idlc('commands/end_sessions.idl')[0],
],
LIBDEPS=[
'logical_session_cache',
@@ -1378,18 +1377,13 @@ env.Library(
)
env.Library(
- target='transaction_reaper',
+ target='shared_request_handling',
source=[
- 'transaction_reaper.cpp',
+ 'handle_request_response.cpp',
+ 'transaction_validation.cpp',
],
- LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
- '$BUILD_DIR/mongo/s/client/shard_interface',
- '$BUILD_DIR/mongo/s/coreshard',
- 'dbdirectclient',
- 'logical_session_id',
- 'sessions_collection',
- 'write_ops',
+ LIBDEPS=[
+ 'logical_session_cache_impl',
],
)
@@ -1397,7 +1391,6 @@ envWithAsio.CppUnitTest(
target='logical_session_cache_test',
source=[
'logical_session_cache_test.cpp',
- 'transaction_reaper_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/executor/async_timer_mock',
@@ -1414,7 +1407,6 @@ envWithAsio.CppUnitTest(
'service_context_d_test_fixture',
'service_liaison_mock',
'sessions_collection_mock',
- 'transaction_reaper',
],
)
@@ -1424,7 +1416,6 @@ envWithAsio.Library(
'logical_session_cache_factory_mongod.cpp',
],
LIBDEPS=[
- 'logical_session_cache',
'logical_session_cache_impl',
'service_liaison_mongod',
'sessions_collection_config_server',
@@ -1432,22 +1423,6 @@ envWithAsio.Library(
'sessions_collection_sharded',
'sessions_collection_standalone',
],
- LIBDEPS_PRIVATE=[
- 'transaction_reaper',
- ],
-)
-
-envWithAsio.Library(
- target='logical_session_cache_factory_mongos',
- source=[
- 'logical_session_cache_factory_mongos.cpp',
- ],
- LIBDEPS=[
- 'logical_session_cache',
- 'logical_session_cache_impl',
- 'service_liaison_mongos',
- 'sessions_collection_sharded',
- ],
)
env.Library(
@@ -1842,14 +1817,17 @@ env.Library(
env.CppUnitTest(
target='sessions_test',
source=[
+ 'session_catalog_reap_sessions_test.cpp',
'session_catalog_test.cpp',
'session_test.cpp',
],
- LIBDEPS=[
+ LIBDEPS_PRIVATE=[
+ '$BUILD_DIR/mongo/client/read_preference',
+ '$BUILD_DIR/mongo/util/clock_source_mock',
+ 'auth/authmocks',
'query_exec',
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
- '$BUILD_DIR/mongo/client/read_preference'
+ 'repl/mock_repl_coord_server_fixture',
+ 'sessions_collection_mock',
],
)
@@ -1858,10 +1836,10 @@ env.CppUnitTest(
source=[
'transaction_history_iterator_test.cpp',
],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
+ LIBDEPS_PRIVATE=[
+ 'auth/authmocks',
'query_exec',
+ 'repl/mock_repl_coord_server_fixture',
'service_context_d',
'write_ops',
],
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index a6beb415bf0..fa520025305 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -105,6 +105,7 @@ env.Library(
"refresh_sessions_command.cpp",
"rename_collection_common.cpp",
"start_session_command.cpp",
+ env.Idlc('end_sessions.idl')[0],
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/bson/mutable/mutable_bson',
diff --git a/src/mongo/db/commands/end_sessions_command.cpp b/src/mongo/db/commands/end_sessions_command.cpp
index 9805ea3f978..9d67e6aadb3 100644
--- a/src/mongo/db/commands/end_sessions_command.cpp
+++ b/src/mongo/db/commands/end_sessions_command.cpp
@@ -34,11 +34,13 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/end_sessions_gen.h"
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
namespace mongo {
+namespace {
class EndSessionsCommand final : public BasicCommand {
MONGO_DISALLOW_COPYING(EndSessionsCommand);
@@ -73,10 +75,10 @@ public:
}
}
- virtual bool run(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
+ bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
auto lsCache = LogicalSessionCache::get(opCtx);
auto cmd = EndSessionsCmdFromClient::parse("EndSessionsCmdFromClient"_sd, cmdObj);
@@ -84,6 +86,8 @@ public:
lsCache->endSessions(makeLogicalSessionIds(cmd.getEndSessions(), opCtx));
return true;
}
+
} endSessionsCommand;
+} // namespace
} // namespace mongo
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 78d59723707..ebc0fcae888 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -544,7 +544,11 @@ ExitCode _initAndListen(int listenPort) {
auto shardingInitialized = ShardingInitializationMongoD::get(startupOpCtx.get())
->initializeShardingAwarenessIfNeeded(startupOpCtx.get());
if (shardingInitialized) {
- waitForShardRegistryReload(startupOpCtx.get()).transitional_ignore();
+ auto status = waitForShardRegistryReload(startupOpCtx.get());
+ if (!status.isOK()) {
+ LOG(0) << "Failed to load the shard registry as part of startup"
+ << causedBy(redact(status));
+ }
}
auto storageEngine = serviceContext->getStorageEngine();
@@ -647,8 +651,7 @@ ExitCode _initAndListen(int listenPort) {
kind = LogicalSessionCacheServer::kReplicaSet;
}
- auto sessionCache = makeLogicalSessionCacheD(kind);
- LogicalSessionCache::set(serviceContext, std::move(sessionCache));
+ LogicalSessionCache::set(serviceContext, makeLogicalSessionCacheD(kind));
// MessageServer::run will return when exit code closes its socket and we don't need the
// operation context anymore
@@ -922,6 +925,11 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
balancer->waitForBalancerToStop();
}
+ // Join the logical session cache before the transport layer.
+ if (auto lsc = LogicalSessionCache::get(serviceContext)) {
+ lsc->joinOnShutDown();
+ }
+
// Shutdown the TransportLayer so that new connections aren't accepted
if (auto tl = serviceContext->getTransportLayer()) {
log(LogComponent::kNetwork) << "shutdown: going to close listening sockets...";
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index a2d30b33164..3f214153a10 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -33,7 +33,6 @@
#include <boost/optional.hpp>
#include "mongo/base/status.h"
-#include "mongo/db/commands/end_sessions_gen.h"
#include "mongo/db/logical_session_cache_stats_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/refresh_sessions_gen.h"
@@ -59,6 +58,11 @@ public:
virtual ~LogicalSessionCache() = 0;
/**
+ * Invoked on service shutdown time in order to join the cache's refresher and reaper tasks.
+ */
+ virtual void joinOnShutDown() = 0;
+
+ /**
* If the cache contains a record for this LogicalSessionId, promotes that lsid
* to be the most recently used and updates its lastUse date to be the current
* time. Returns an error if the session was not found.
diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp
index 0b7544adc18..b75bf72866b 100644
--- a/src/mongo/db/logical_session_cache_factory_mongod.cpp
+++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp
@@ -32,24 +32,22 @@
#include "mongo/platform/basic.h"
-#include <memory>
-
#include "mongo/db/logical_session_cache_factory_mongod.h"
#include "mongo/db/logical_session_cache_impl.h"
#include "mongo/db/service_liaison_mongod.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/sessions_collection_config_server.h"
#include "mongo/db/sessions_collection_rs.h"
#include "mongo/db/sessions_collection_sharded.h"
#include "mongo/db/sessions_collection_standalone.h"
-#include "mongo/db/transaction_reaper.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
namespace mongo {
std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state) {
- auto liaison = stdx::make_unique<ServiceLiaisonMongod>();
+ auto liaison = std::make_unique<ServiceLiaisonMongod>();
auto sessionsColl = [&]() -> std::shared_ptr<SessionsCollection> {
switch (state) {
@@ -66,22 +64,8 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCach
MONGO_UNREACHABLE;
}();
- auto reaper = [&]() -> std::shared_ptr<TransactionReaper> {
- switch (state) {
- case LogicalSessionCacheServer::kSharded:
- return TransactionReaper::make(TransactionReaper::Type::kSharded, sessionsColl);
- case LogicalSessionCacheServer::kReplicaSet:
- return TransactionReaper::make(TransactionReaper::Type::kReplicaSet, sessionsColl);
- case LogicalSessionCacheServer::kConfigServer:
- case LogicalSessionCacheServer::kStandalone:
- return nullptr;
- }
-
- MONGO_UNREACHABLE;
- }();
-
- return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), std::move(reaper));
+ return std::make_unique<LogicalSessionCacheImpl>(
+ std::move(liaison), std::move(sessionsColl), SessionCatalog::reapSessionsOlderThan);
}
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_factory_mongos.cpp b/src/mongo/db/logical_session_cache_factory_mongos.cpp
deleted file mode 100644
index 2f3c297b202..00000000000
--- a/src/mongo/db/logical_session_cache_factory_mongos.cpp
+++ /dev/null
@@ -1,53 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-
-#include "mongo/db/logical_session_cache_factory_mongos.h"
-
-#include "mongo/db/logical_session_cache_impl.h"
-#include "mongo/db/server_parameters.h"
-#include "mongo/db/service_liaison_mongos.h"
-#include "mongo/db/sessions_collection_sharded.h"
-#include "mongo/stdx/memory.h"
-
-namespace mongo {
-
-std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS() {
- auto liaison = stdx::make_unique<ServiceLiaisonMongos>();
- auto sessionsColl = stdx::make_unique<SessionsCollectionSharded>();
-
- return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), nullptr /* reaper */);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_factory_mongos.h b/src/mongo/db/logical_session_cache_factory_mongos.h
deleted file mode 100644
index ae884b2cf0e..00000000000
--- a/src/mongo/db/logical_session_cache_factory_mongos.h
+++ /dev/null
@@ -1,42 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <memory>
-
-#include "mongo/db/logical_session_cache.h"
-#include "mongo/db/service_liaison.h"
-
-namespace mongo {
-
-std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS();
-
-} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index c2b282baf3f..81bd2f95a93 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -46,7 +46,6 @@
#include "mongo/util/scopeguard.h"
namespace mongo {
-
namespace {
void clearShardingOperationFailedStatus(OperationContext* opCtx) {
@@ -58,24 +57,40 @@ void clearShardingOperationFailedStatus(OperationContext* opCtx) {
} // namespace
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(
- logicalSessionRefreshMillis,
- int,
- LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh.count());
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRefreshMillis,
+ int,
+ durationCount<Milliseconds>(Minutes{5}));
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, false);
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(maxSessions, int, 1'000'000);
-constexpr Milliseconds LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh;
-
-LogicalSessionCacheImpl::LogicalSessionCacheImpl(
- std::unique_ptr<ServiceLiaison> service,
- std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper)
+/**
+* The minimum lifetime for a transaction record is how long it has to have lived on the server
+* before we'll consider it for cleanup. This is effectively the window for how long it is
+* permissible for a mongos to hang before we're willing to accept a failure of the retryable write
+* subsystem.
+*
+* Specifically, we imagine that a client connects to one mongos on a session and performs a
+* retryable write. That mongos hangs. Then the client connects to a new mongos on the same
+* session and successfully executes its write. After a day passes, the session will time out,
+* cleaning up the retryable write. Then the original mongos wakes up, vivifies the session and
+* executes the write (because all records of the session + transaction have been deleted).
+*
+* So the write is performed twice, which is unavoidable without losing session vivification and/or
+* requiring synchronized clocks all the way out to the client. In lieu of that we provide a weaker
+* guarantee after the minimum transaction lifetime.
+*/
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes,
+ int,
+ durationCount<Minutes>(Minutes{30}));
+
+LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
+ std::shared_ptr<SessionsCollection> collection,
+ ReapSessionsOlderThanFn reapSessionsOlderThanFn)
: _service(std::move(service)),
_sessionsColl(std::move(collection)),
- _transactionReaper(std::move(transactionReaper)) {
+ _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) {
_stats.setLastSessionsCollectionJobTimestamp(now());
_stats.setLastTransactionReaperJobTimestamp(now());
@@ -84,26 +99,22 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
[this](Client* client) { _periodicRefresh(client); },
Milliseconds(logicalSessionRefreshMillis)});
- if (_transactionReaper) {
- _service->scheduleJob({"LogicalSessionCacheReap",
- [this](Client* client) { _periodicReap(client); },
- Milliseconds(logicalSessionRefreshMillis)});
- }
+ _service->scheduleJob({"LogicalSessionCacheReap",
+ [this](Client* client) { _periodicReap(client); },
+ Milliseconds(logicalSessionRefreshMillis)});
}
}
LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
- try {
- _service->join();
- } catch (...) {
- // If we failed to join we might still be running a background thread, log but swallow the
- // error since there is no good way to recover
- severe() << "Failed to join background service thread";
- }
+ joinOnShutDown();
+}
+
+void LogicalSessionCacheImpl::joinOnShutDown() {
+ _service->join();
}
Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto it = _activeSessions.find(lsid);
if (it == _activeSessions.end()) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
@@ -178,7 +189,7 @@ Date_t LogicalSessionCacheImpl::now() {
}
size_t LogicalSessionCacheImpl::size() {
- stdx::lock_guard<stdx::mutex> lock(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
return _activeSessions.size();
}
@@ -200,13 +211,9 @@ void LogicalSessionCacheImpl::_periodicReap(Client* client) {
}
Status LogicalSessionCacheImpl::_reap(Client* client) {
- if (!_transactionReaper) {
- return Status::OK();
- }
-
// Take the lock to update some stats.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the last set of stats for our new run.
_stats.setLastTransactionReaperJobDurationMillis(0);
@@ -217,19 +224,19 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
_stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1);
}
- int numReaped = 0;
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
- try {
- boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
- auto* const opCtx = [&client, &uniqueCtx] {
- if (client->getOperationContext()) {
- return client->getOperationContext();
- }
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
- uniqueCtx.emplace(client->makeOperationContext());
- return uniqueCtx->get();
- }();
+ int numReaped = 0;
+ try {
ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); });
auto existsStatus = _sessionsColl->checkSessionsCollectionExists(opCtx);
@@ -247,20 +254,23 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return Status::OK();
}
- stdx::lock_guard<stdx::mutex> lk(_reaperMutex);
- numReaped = _transactionReaper->reap(opCtx);
- } catch (...) {
+ numReaped =
+ _reapSessionsOlderThanFn(opCtx,
+ *_sessionsColl,
+ opCtx->getServiceContext()->getFastClockSource()->now() -
+ Minutes(TransactionRecordMinimumLifetimeMinutes));
+ } catch (const DBException& ex) {
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
}
- return exceptionToStatus();
+ return ex.toStatus();
}
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
_stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped);
@@ -272,7 +282,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
void LogicalSessionCacheImpl::_refresh(Client* client) {
// Stats for serverStatus:
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the refresh-related stats with the beginning of our run.
_stats.setLastSessionsCollectionJobDurationMillis(0);
@@ -287,7 +297,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// This will finish timing _refresh for our stats no matter when we return.
const auto timeRefreshJob = MakeGuard([this] {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp();
_stats.setLastSessionsCollectionJobDurationMillis(millis.count());
});
@@ -317,38 +327,35 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
LogicalSessionIdSet explicitlyEndingSessions;
LogicalSessionIdMap<LogicalSessionRecord> activeSessions;
- // backSwapper creates a guard that in the case of a exception
- // replaces the ending or active sessions that swapped out of of LogicalSessionCache,
- // and merges in any records that had been added since we swapped them
- // out.
- auto backSwapper = [this](auto& member, auto& temp) {
- return MakeGuard([this, &member, &temp] {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
- using std::swap;
- swap(member, temp);
- for (const auto& it : temp) {
- member.emplace(it);
- }
- });
- };
-
{
using std::swap;
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
swap(explicitlyEndingSessions, _endingSessions);
swap(activeSessions, _activeSessions);
}
- auto activeSessionsBackSwapper = backSwapper(_activeSessions, activeSessions);
- auto explicitlyEndingBackSwaper = backSwapper(_endingSessions, explicitlyEndingSessions);
+
+ // Create guards that in the case of a exception replace the ending or active sessions that
+ // swapped out of LogicalSessionCache, and merges in any records that had been added since we
+ // swapped them out.
+ auto backSwap = [this](auto& member, auto& temp) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ using std::swap;
+ swap(member, temp);
+ for (const auto& it : temp) {
+ member.emplace(it);
+ }
+ };
+ auto activeSessionsBackSwapper = MakeGuard([&] { backSwap(_activeSessions, activeSessions); });
+ auto explicitlyEndingBackSwaper =
+ MakeGuard([&] { backSwap(_endingSessions, explicitlyEndingSessions); });
// remove all explicitlyEndingSessions from activeSessions
for (const auto& lsid : explicitlyEndingSessions) {
activeSessions.erase(lsid);
}
- // refresh all recently active sessions as well as for sessions attached to running ops
-
- LogicalSessionRecordSet activeSessionRecords{};
+ // Refresh all recently active sessions as well as for sessions attached to running ops
+ LogicalSessionRecordSet activeSessionRecords;
auto runningOpSessions = _service->getActiveOpSessions();
@@ -367,7 +374,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords));
activeSessionsBackSwapper.Dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size());
}
@@ -375,20 +382,18 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions));
explicitlyEndingBackSwaper.Dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size());
}
// Find which running, but not recently active sessions, are expired, and add them
// to the list of sessions to kill cursors for
-
- KillAllSessionsByPatternSet patterns;
-
auto openCursorSessions = _service->getOpenCursorSessions();
+
// Exclude sessions added to _activeSessions from the openCursorSession to avoid race between
// killing cursors on the removed sessions and creating sessions.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
for (const auto& it : _activeSessions) {
auto newSessionIt = openCursorSessions.find(it.first);
@@ -398,9 +403,11 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
}
}
- // think about pruning ending and active out of openCursorSessions
+ // Think about pruning ending and active out of openCursorSessions
auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions);
+ KillAllSessionsByPatternSet patterns;
+
if (statusAndRemovedSessions.isOK()) {
auto removedSessions = statusAndRemovedSessions.getValue();
for (const auto& lsid : removedSessions) {
@@ -418,33 +425,34 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
SessionKiller::Matcher matcher(std::move(patterns));
auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher));
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobCursorsClosed(killRes.second);
}
}
void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_endingSessions.insert(begin(sessions), end(sessions));
}
LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setActiveSessionsCount(_activeSessions.size());
return _stats;
}
Status LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeSessions.size() >= static_cast<size_t>(maxSessions)) {
return {ErrorCodes::TooManyLogicalSessions, "cannot add session into the cache"};
}
+
_activeSessions.insert(std::make_pair(record.getId(), record));
return Status::OK();
}
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
ret.reserve(_activeSessions.size());
for (const auto& id : _activeSessions) {
@@ -455,7 +463,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
const std::vector<SHA256Block>& userDigests) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
for (const auto& it : _activeSessions) {
if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) !=
@@ -468,11 +476,12 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached(
const LogicalSessionId& id) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto it = _activeSessions.find(id);
if (it == _activeSessions.end()) {
return boost::none;
}
return it->second;
}
+
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h
index 9158da1397a..e1e3353b579 100644
--- a/src/mongo/db/logical_session_cache_impl.h
+++ b/src/mongo/db/logical_session_cache_impl.h
@@ -31,22 +31,11 @@
#pragma once
#include "mongo/db/logical_session_cache.h"
-#include "mongo/db/logical_session_id.h"
-#include "mongo/db/refresh_sessions_gen.h"
#include "mongo/db/service_liaison.h"
#include "mongo/db/sessions_collection.h"
-#include "mongo/db/time_proof_service.h"
-#include "mongo/db/transaction_reaper.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/lru_cache.h"
namespace mongo {
-class Client;
-class OperationContext;
-class ServiceContext;
-
/**
* A thread-safe cache structure for logical session records.
*
@@ -63,20 +52,20 @@ class ServiceContext;
*/
class LogicalSessionCacheImpl final : public LogicalSessionCache {
public:
- static constexpr Milliseconds kLogicalSessionDefaultRefresh = Milliseconds(5 * 60 * 1000);
+ using ReapSessionsOlderThanFn =
+ stdx::function<int(OperationContext*, SessionsCollection&, Date_t)>;
- /**
- * Construct a new session cache.
- */
LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper);
+ ReapSessionsOlderThanFn reapSessionsOlderThanFn);
LogicalSessionCacheImpl(const LogicalSessionCacheImpl&) = delete;
LogicalSessionCacheImpl& operator=(const LogicalSessionCacheImpl&) = delete;
~LogicalSessionCacheImpl();
+ void joinOnShutDown() override;
+
Status promote(LogicalSessionId lsid) override;
Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override;
@@ -128,23 +117,19 @@ private:
*/
Status _addToCache(LogicalSessionRecord record);
- // This value is only modified under the lock, and is modified
- // automatically by the background jobs.
- LogicalSessionCacheStats _stats;
-
std::unique_ptr<ServiceLiaison> _service;
std::shared_ptr<SessionsCollection> _sessionsColl;
+ ReapSessionsOlderThanFn _reapSessionsOlderThanFn;
- mutable stdx::mutex _reaperMutex;
- std::shared_ptr<TransactionReaper> _transactionReaper;
-
- mutable stdx::mutex _cacheMutex;
+ mutable stdx::mutex _mutex;
LogicalSessionIdMap<LogicalSessionRecord> _activeSessions;
LogicalSessionIdSet _endingSessions;
- Date_t lastRefreshTime;
+ Date_t _lastRefreshTime;
+
+ LogicalSessionCacheStats _stats;
};
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h
index 771756393da..e93bfc3f672 100644
--- a/src/mongo/db/logical_session_cache_noop.h
+++ b/src/mongo/db/logical_session_cache_noop.h
@@ -43,6 +43,8 @@ class ServiceContext;
*/
class LogicalSessionCacheNoop : public LogicalSessionCache {
public:
+ void joinOnShutDown() override {}
+
Status promote(LogicalSessionId lsid) override {
return Status::OK();
}
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp
index 8bbf986d638..fd0edc31cab 100644
--- a/src/mongo/db/logical_session_cache_test.cpp
+++ b/src/mongo/db/logical_session_cache_test.cpp
@@ -54,8 +54,8 @@
namespace mongo {
namespace {
-const Milliseconds kSessionTimeout = duration_cast<Milliseconds>(kLogicalSessionDefaultTimeout);
-const Milliseconds kForceRefresh{LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh};
+const Minutes kSessionTimeout{30};
+const Minutes kForceRefresh{5};
using SessionList = std::list<LogicalSessionId>;
using unittest::EnsureFCV;
@@ -80,7 +80,11 @@ public:
auto mockService = stdx::make_unique<MockServiceLiaison>(_service);
auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions);
_cache = stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(mockService), std::move(mockSessions), nullptr /* reaper */);
+ std::move(mockService),
+ std::move(mockSessions),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ });
}
void waitUntilRefreshScheduled() {
diff --git a/src/mongo/db/logical_session_id_test.cpp b/src/mongo/db/logical_session_id_test.cpp
index 3d90d189d5b..c9729c163c4 100644
--- a/src/mongo/db/logical_session_id_test.cpp
+++ b/src/mongo/db/logical_session_id_test.cpp
@@ -93,7 +93,11 @@ public:
std::make_shared<MockSessionsCollectionImpl>());
auto localLogicalSessionCache = std::make_unique<LogicalSessionCacheImpl>(
- std::move(localServiceLiaison), std::move(localSessionsCollection), nullptr);
+ std::move(localServiceLiaison),
+ std::move(localSessionsCollection),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ });
LogicalSessionCache::set(getServiceContext(), std::move(localLogicalSessionCache));
}
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index cc27f325639..85b28afd6e8 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -56,6 +56,8 @@ constexpr StringData NamespaceString::kOrphanCollectionDb;
const NamespaceString NamespaceString::kServerConfigurationNamespace(NamespaceString::kAdminDb,
"system.version");
+const NamespaceString NamespaceString::kLogicalSessionsNamespace(NamespaceString::kConfigDb,
+ "system.sessions");
const NamespaceString NamespaceString::kSessionTransactionsTableNamespace(
NamespaceString::kConfigDb, "transactions");
const NamespaceString NamespaceString::kShardConfigCollectionsNamespace(NamespaceString::kConfigDb,
diff --git a/src/mongo/db/namespace_string.h b/src/mongo/db/namespace_string.h
index 0ab006981b3..85462caf35a 100644
--- a/src/mongo/db/namespace_string.h
+++ b/src/mongo/db/namespace_string.h
@@ -73,6 +73,9 @@ public:
// represents. For example, 'shardIdentity' and 'featureCompatibilityVersion'.
static const NamespaceString kServerConfigurationNamespace;
+ // Namespace for storing the logical sessions information
+ static const NamespaceString kLogicalSessionsNamespace;
+
// Namespace for storing the transaction information for each session
static const NamespaceString kSessionTransactionsTableNamespace;
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index dc56763d8a3..a1e9bfcc5f9 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -218,7 +218,7 @@ env.Library(
)
env.Library(
- target='commands_db_s',
+ target='sharding_commands_d',
source=[
'check_sharding_index_command.cpp',
'cleanup_orphaned_cmd.cpp',
@@ -307,20 +307,25 @@ env.CppUnitTest(
'migration_chunk_cloner_source_legacy_test.cpp',
'migration_destination_manager_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
+ 'session_catalog_migration_destination_test.cpp',
+ 'session_catalog_migration_source_test.cpp',
'shard_metadata_util_test.cpp',
'shard_server_catalog_cache_loader_test.cpp',
'sharding_initialization_mongod_test.cpp',
'sharding_initialization_op_observer_test.cpp',
'split_vector_test.cpp',
],
- LIBDEPS=[
- 'sharding_runtime_d',
+ LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/logical_session_cache_impl',
+ '$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/query/query_request',
+ '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
'$BUILD_DIR/mongo/s/shard_server_test_fixture',
+ 'sharding_runtime_d',
],
)
@@ -345,31 +350,6 @@ env.CppUnitTest(
)
env.CppUnitTest(
- target='session_catalog_migration_source_test',
- source=[
- 'session_catalog_migration_source_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
- ]
-)
-
-env.CppUnitTest(
- target='session_catalog_migration_destination_test',
- source=[
- 'session_catalog_migration_destination_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/ops/write_ops_exec',
- '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
- '$BUILD_DIR/mongo/s/shard_server_test_fixture',
- 'sharding_runtime_d',
- ]
-)
-
-env.CppUnitTest(
target='sharding_catalog_manager_test',
source=[
'config/initial_split_policy_test.cpp',
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index 0781349eae4..efb93229584 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -38,11 +38,15 @@
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/dbdirectclient.h"
#include "mongo/db/kill_sessions_common.h"
#include "mongo/db/namespace_string.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/write_ops.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session_txn_record_gen.h"
#include "mongo/rpc/get_status_from_command_result.h"
+#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -54,6 +58,64 @@ const auto sessionTransactionTableDecoration = ServiceContext::declareDecoration
const auto operationSessionDecoration =
OperationContext::declareDecoration<boost::optional<ScopedCheckedOutSession>>();
+const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
+const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
+const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
+
+/**
+ * Removes the specified set of session ids from the persistent sessions collection and returns the
+ * number of sessions actually removed.
+ */
+int removeSessionsTransactionRecords(OperationContext* opCtx,
+ SessionsCollection& sessionsCollection,
+ const LogicalSessionIdSet& sessionIdsToRemove) {
+ if (sessionIdsToRemove.empty())
+ return 0;
+
+ // From the passed-in sessions, find the ones which are actually expired/removed
+ auto expiredSessionIds =
+ uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove));
+
+ if (expiredSessionIds.empty())
+ return 0;
+
+ // Remove the session ids from the on-disk catalog
+ write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace);
+ deleteOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase base;
+ base.setOrdered(false);
+ return base;
+ }());
+ deleteOp.setDeletes([&] {
+ std::vector<write_ops::DeleteOpEntry> entries;
+ for (const auto& lsid : expiredSessionIds) {
+ entries.emplace_back([&] {
+ write_ops::DeleteOpEntry entry;
+ entry.setQ(BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()));
+ entry.setMulti(false);
+ return entry;
+ }());
+ }
+ return entries;
+ }());
+
+ BSONObj result;
+
+ DBDirectClient client(opCtx);
+ client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
+ deleteOp.toBSON({}),
+ result);
+
+ BatchedCommandResponse response;
+ std::string errmsg;
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse response " << result,
+ response.parseBSON(result, &errmsg));
+ uassertStatusOK(response.getTopLevelStatus());
+
+ return response.getN();
+}
+
} // namespace
SessionCatalog::~SessionCatalog() {
@@ -89,6 +151,40 @@ boost::optional<UUID> SessionCatalog::getTransactionTableUUID(OperationContext*
return coll->uuid();
}
+int SessionCatalog::reapSessionsOlderThan(OperationContext* opCtx,
+ SessionsCollection& sessionsCollection,
+ Date_t possiblyExpired) {
+ // Scan for records older than the minimum lifetime and uses a sort to walk the '_id' index
+ DBDirectClient client(opCtx);
+ auto cursor =
+ client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ Query(BSON(kLastWriteDateFieldName << LT << possiblyExpired)).sort(kSortById),
+ 0,
+ 0,
+ &kIdProjection);
+
+ // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object size
+ // limit
+ const int kMaxBatchSize = 10'000;
+
+ LogicalSessionIdSet lsids;
+ int numReaped = 0;
+ while (cursor->more()) {
+ auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
+ "TransactionSession"_sd, cursor->next());
+
+ lsids.insert(transactionSession.get_id());
+ if (lsids.size() > kMaxBatchSize) {
+ numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids);
+ lsids.clear();
+ }
+ }
+
+ numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids);
+
+ return numReaped;
+}
+
void SessionCatalog::onStepUp(OperationContext* opCtx) {
invalidateSessions(opCtx, boost::none);
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 197a344e55a..4730b0c5b44 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -34,6 +34,7 @@
#include "mongo/db/logical_session_id.h"
#include "mongo/db/session.h"
#include "mongo/db/session_killer.h"
+#include "mongo/db/sessions_collection.h"
#include "mongo/stdx/condition_variable.h"
#include "mongo/stdx/mutex.h"
#include "mongo/stdx/unordered_map.h"
@@ -71,6 +72,16 @@ public:
static boost::optional<UUID> getTransactionTableUUID(OperationContext* opCtx);
/**
+ * Locates session entries from the in-memory catalog and in 'config.transactions' which have
+ * not been referenced before 'possiblyExpired' and deletes them.
+ *
+ * Returns the number of sessions, which were reaped from the persisted store on disk.
+ */
+ static int reapSessionsOlderThan(OperationContext* opCtx,
+ SessionsCollection& sessionsCollection,
+ Date_t possiblyExpired);
+
+ /**
* Resets the transaction table to an uninitialized state.
* Meant only for testing.
*/
diff --git a/src/mongo/db/transaction_reaper_test.cpp b/src/mongo/db/session_catalog_reap_sessions_test.cpp
index fe78891ef13..69608d909ae 100644
--- a/src/mongo/db/transaction_reaper_test.cpp
+++ b/src/mongo/db/session_catalog_reap_sessions_test.cpp
@@ -32,15 +32,15 @@
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/service_context_d_test_fixture.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/sessions_collection_mock.h"
-#include "mongo/db/transaction_reaper.h"
#include "mongo/util/clock_source_mock.h"
namespace mongo {
namespace {
-class TransactionReaperTest : public ServiceContextMongoDTest {
+class SessionCatalogReapSessionsTest : public ServiceContextMongoDTest {
protected:
void setUp() override {
const auto service = getServiceContext();
@@ -61,26 +61,12 @@ protected:
std::shared_ptr<MockSessionsCollectionImpl> _collectionMock{
std::make_shared<MockSessionsCollectionImpl>()};
- std::unique_ptr<TransactionReaper> _reaper{
- TransactionReaper::make(TransactionReaper::Type::kReplicaSet,
- std::make_shared<MockSessionsCollection>(_collectionMock))};
+ std::shared_ptr<SessionsCollection> _collection{
+ std::make_shared<MockSessionsCollection>(_collectionMock)};
};
-TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) {
- _collectionMock->add([&] {
- LogicalSessionRecord rec;
- rec.setId(makeLogicalSessionIdForTest());
- rec.setLastUse(clock()->now());
- return rec;
- }());
-
- _collectionMock->add([&] {
- LogicalSessionRecord rec;
- rec.setId(makeLogicalSessionIdForTest());
- rec.setLastUse(clock()->now());
- return rec;
- }());
-
+TEST_F(SessionCatalogReapSessionsTest, ReapSomeExpiredSomeNot) {
+ // Create some "old" sessions
DBDirectClient client(_opCtx);
SessionTxnRecord txn1;
txn1.setSessionId(makeLogicalSessionIdForTest());
@@ -96,8 +82,25 @@ TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
std::vector<BSONObj>{txn1.toBSON(), txn2.toBSON()});
+ // Add some "new" sessions to ensure they don't get reaped
clock()->advance(Minutes{31});
- ASSERT_EQ(2, _reaper->reap(_opCtx));
+ _collectionMock->add([&] {
+ LogicalSessionRecord rec;
+ rec.setId(makeLogicalSessionIdForTest());
+ rec.setLastUse(clock()->now());
+ return rec;
+ }());
+
+ _collectionMock->add([&] {
+ LogicalSessionRecord rec;
+ rec.setId(makeLogicalSessionIdForTest());
+ rec.setLastUse(clock()->now());
+ return rec;
+ }());
+
+ ASSERT_EQ(
+ 2,
+ SessionCatalog::reapSessionsOlderThan(_opCtx, *_collection, clock()->now() - Minutes{30}));
}
} // namespace
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index e4bad86e337..9bb47a9b911 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -216,7 +216,7 @@ SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const N
}
Status SessionsCollection::doRefresh(const NamespaceString& ns,
- const LogicalSessionRecordSet& sessions,
+ const std::vector<LogicalSessionRecord>& sessions,
SendBatchFn send) {
auto init = [ns](BSONObjBuilder* batch) {
batch->append("update", ns.coll());
@@ -234,7 +234,7 @@ Status SessionsCollection::doRefresh(const NamespaceString& ns,
}
Status SessionsCollection::doRemove(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
+ const std::vector<LogicalSessionId>& sessions,
SendBatchFn send) {
auto init = [ns](BSONObjBuilder* batch) {
batch->append("delete", ns.coll());
@@ -249,16 +249,15 @@ Status SessionsCollection::doRemove(const NamespaceString& ns,
return runBulkCmd("deletes", init, add, send, sessions);
}
-StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
- FindBatchFn send) {
+StatusWith<LogicalSessionIdSet> SessionsCollection::doFindRemoved(
+ const NamespaceString& ns, const std::vector<LogicalSessionId>& sessions, FindBatchFn send) {
auto makeT = [] { return std::vector<LogicalSessionId>{}; };
auto add = [](std::vector<LogicalSessionId>& batch, const LogicalSessionId& record) {
batch.push_back(record);
};
- LogicalSessionIdSet removed = sessions;
+ LogicalSessionIdSet removed{sessions.begin(), sessions.end()};
auto wrappedSend = [&](BSONObj batch) {
auto swBatchResult = send(batch);
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index 5daabee3d82..206e37c3dfb 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -104,31 +104,34 @@ protected:
* Makes a send function for the given client.
*/
using SendBatchFn = stdx::function<Status(BSONObj batch)>;
- SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client);
- SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client);
+ static SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client);
+ static SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client);
+
using FindBatchFn = stdx::function<StatusWith<BSONObj>(BSONObj batch)>;
- FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client);
+ static FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client);
/**
* Formats and sends batches of refreshes for the given set of sessions.
*/
Status doRefresh(const NamespaceString& ns,
- const LogicalSessionRecordSet& sessions,
+ const std::vector<LogicalSessionRecord>& sessions,
SendBatchFn send);
/**
* Formats and sends batches of deletes for the given set of sessions.
*/
Status doRemove(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
+ const std::vector<LogicalSessionId>& sessions,
SendBatchFn send);
/**
- * Formats and sends batches of fetches for the given set of sessions.
+ * Returns those lsids from the input 'sessions' array which are not present in the sessions
+ * collection (essentially performs an inner join of 'sessions' against the sessions
+ * collection).
*/
- StatusWith<LogicalSessionIdSet> doFetch(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
- FindBatchFn send);
+ StatusWith<LogicalSessionIdSet> doFindRemoved(const NamespaceString& ns,
+ const std::vector<LogicalSessionId>& sessions,
+ FindBatchFn send);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp
index c0c33539772..76f6814dbce 100644
--- a/src/mongo/db/sessions_collection_rs.cpp
+++ b/src/mongo/db/sessions_collection_rs.cpp
@@ -209,54 +209,66 @@ Status SessionsCollectionRS::checkSessionsCollectionExists(OperationContext* opC
Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions) {
- return dispatch(kSessionsNamespaceString,
+ const std::vector<LogicalSessionRecord> sessionsVector(sessions.begin(), sessions.end());
+
+ return dispatch(NamespaceString::kLogicalSessionsNamespace,
opCtx,
[&] {
DBDirectClient client(opCtx);
- return doRefresh(
- kSessionsNamespaceString,
- sessions,
- makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
+ return doRefresh(NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeSendFnForBatchWrite(
+ NamespaceString::kLogicalSessionsNamespace, &client));
},
[&](DBClientBase* client) {
- return doRefresh(kSessionsNamespaceString,
- sessions,
- makeSendFnForBatchWrite(kSessionsNamespaceString, client));
+ return doRefresh(NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeSendFnForBatchWrite(
+ NamespaceString::kLogicalSessionsNamespace, client));
});
}
Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
const LogicalSessionIdSet& sessions) {
- return dispatch(kSessionsNamespaceString,
+ const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
+
+ return dispatch(NamespaceString::kLogicalSessionsNamespace,
opCtx,
[&] {
DBDirectClient client(opCtx);
- return doRemove(kSessionsNamespaceString,
- sessions,
- makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
+ return doRemove(NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeSendFnForBatchWrite(
+ NamespaceString::kLogicalSessionsNamespace, &client));
},
[&](DBClientBase* client) {
- return doRemove(kSessionsNamespaceString,
- sessions,
- makeSendFnForBatchWrite(kSessionsNamespaceString, client));
+ return doRemove(NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeSendFnForBatchWrite(
+ NamespaceString::kLogicalSessionsNamespace, client));
});
}
StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
- return dispatch(kSessionsNamespaceString,
- opCtx,
- [&] {
- DBDirectClient client(opCtx);
- return doFetch(kSessionsNamespaceString,
- sessions,
- makeFindFnForCommand(kSessionsNamespaceString, &client));
- },
- [&](DBClientBase* client) {
- return doFetch(kSessionsNamespaceString,
- sessions,
- makeFindFnForCommand(kSessionsNamespaceString, client));
- });
+ const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
+
+ return dispatch(
+ NamespaceString::kLogicalSessionsNamespace,
+ opCtx,
+ [&] {
+ DBDirectClient client(opCtx);
+ return doFindRemoved(
+ NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client));
+ },
+ [&](DBClientBase* client) {
+ return doFindRemoved(
+ NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, client));
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index bae7f3ef8c6..035337ca619 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -40,6 +40,8 @@
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/rpc/op_msg.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/write_ops/batch_write_exec.h"
@@ -78,6 +80,60 @@ Status SessionsCollectionSharded::_checkCacheForSessionsCollection(OperationCont
return {ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist"};
}
+std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace.ns()
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionId> sessionIdsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionIdsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionId> sessionIdsGroupedByShard;
+ sessionIdsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionIdsByOwningShard) {
+ sessionIdsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionIdsGroupedByShard;
+}
+
+std::vector<LogicalSessionRecord> SessionsCollectionSharded::_groupSessionRecordsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionRecordSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace.ns()
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionRecord> sessionsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionRecord> sessionRecordsGroupedByShard;
+ sessionRecordsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionsByOwningShard) {
+ sessionRecordsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionRecordsGroupedByShard;
+}
+
Status SessionsCollectionSharded::setupSessionsCollection(OperationContext* opCtx) {
return checkSessionsCollectionExists(opCtx);
}
@@ -100,7 +156,9 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
return response.toStatus();
};
- return doRefresh(kSessionsNamespaceString, sessions, send);
+ return doRefresh(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionRecordsByOwningShard(opCtx, sessions),
+ send);
}
Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
@@ -117,7 +175,9 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
return response.toStatus();
};
- return doRemove(kSessionsNamespaceString, sessions, send);
+ return doRemove(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
@@ -161,7 +221,9 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
return result.obj();
};
- return doFetch(kSessionsNamespaceString, sessions, send);
+ return doFindRemoved(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h
index 112c56aae26..1021f78e9aa 100644
--- a/src/mongo/db/sessions_collection_sharded.h
+++ b/src/mongo/db/sessions_collection_sharded.h
@@ -74,6 +74,20 @@ public:
protected:
Status _checkCacheForSessionsCollection(OperationContext* opCtx);
+
+ /**
+ * These two methods use the sharding routing metadata to do a best effort attempt at grouping
+ * the specified set of sessions by the shards, which have the records for these sessions. This
+ * is done as an attempt to avoid broadcast queries.
+ *
+ * The reason it is 'best effort' is because it makes no attempt at checking whether the routing
+ * table is up-to-date and just picks up whatever was most recently fetched from the config
+ * server, which could be stale.
+ */
+ std::vector<LogicalSessionId> _groupSessionIdsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions);
+ std::vector<LogicalSessionRecord> _groupSessionRecordsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionRecordSet& sessions);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp
index 67d61e0c8a2..963209cd577 100644
--- a/src/mongo/db/sessions_collection_standalone.cpp
+++ b/src/mongo/db/sessions_collection_standalone.cpp
@@ -100,26 +100,29 @@ Status SessionsCollectionStandalone::checkSessionsCollectionExists(OperationCont
Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions) {
+ const std::vector<LogicalSessionRecord> sessionsVector(sessions.begin(), sessions.end());
DBDirectClient client(opCtx);
- return doRefresh(kSessionsNamespaceString,
- sessions,
- makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
+ return doRefresh(NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client));
}
Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx,
const LogicalSessionIdSet& sessions) {
+ const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
DBDirectClient client(opCtx);
- return doRemove(kSessionsNamespaceString,
- sessions,
- makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
+ return doRemove(NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client));
}
StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
DBDirectClient client(opCtx);
- return doFetch(kSessionsNamespaceString,
- sessions,
- makeFindFnForCommand(kSessionsNamespaceString, &client));
+ return doFindRemoved(NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client));
}
} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
deleted file mode 100644
index b50f7eecc77..00000000000
--- a/src/mongo/db/transaction_reaper.cpp
+++ /dev/null
@@ -1,317 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/transaction_reaper.h"
-
-#include "mongo/bson/bsonmisc.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/ops/write_ops.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/server_parameters.h"
-#include "mongo/db/session_txn_record_gen.h"
-#include "mongo/db/sessions_collection.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/shard.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-namespace {
-
-constexpr Minutes kTransactionRecordMinimumLifetime(30);
-
-/**
- * The minimum lifetime for a transaction record is how long it has to have lived on the server
- * before we'll consider it for cleanup. This is effectively the window for how long it is
- * permissible for a mongos to hang before we're willing to accept a failure of the retryable write
- * subsystem.
- *
- * Specifically, we imagine that a client connects to one mongos on a session and performs a
- * retryable write. That mongos hangs. Then the client connects to a new mongos on the same
- * session and successfully executes its write. After a day passes, the session will time out,
- * cleaning up the retryable write. Then the original mongos wakes up, vivifies the session and
- * executes the write (because all records of the session + transaction have been deleted).
- *
- * So the write is performed twice, which is unavoidable without losing session vivification and/or
- * requiring synchronized clocks all the way out to the client. In lieu of that we provide a weaker
- * guarantee after the minimum transaction lifetime.
- */
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(TransactionRecordMinimumLifetimeMinutes,
- int,
- kTransactionRecordMinimumLifetime.count());
-
-const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
-const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
-const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
-
-/**
- * Makes the query we'll use to scan the transactions table.
- *
- * Scans for records older than the minimum lifetime and uses a sort to walk the index and attempt
- * to pull records likely to be on the same chunks (because they sort near each other).
- */
-Query makeQuery(Date_t now) {
- const Date_t possiblyExpired(now - Minutes(TransactionRecordMinimumLifetimeMinutes));
- Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired));
- query.sort(kSortById);
- return query;
-}
-
-/**
- * Our impl is templatized on a type which handles the lsids we see. It provides the top level
- * scaffolding for figuring out if we're the primary node responsible for the transaction table and
- * invoking the handler.
- *
- * The handler here will see all of the possibly expired txn ids in the transaction table and will
- * have a lifetime associated with a single call to reap.
- */
-template <typename Handler>
-class TransactionReaperImpl final : public TransactionReaper {
-public:
- TransactionReaperImpl(std::shared_ptr<SessionsCollection> collection)
- : _collection(std::move(collection)) {}
-
- int reap(OperationContext* opCtx) override {
- Handler handler(opCtx, *_collection);
- if (!handler.initialize()) {
- return 0;
- }
-
- // Make a best-effort attempt to only reap when the node is running as a primary
- const auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
- if (!coord->canAcceptWritesForDatabase_UNSAFE(
- opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) {
- return 0;
- }
-
- DBDirectClient client(opCtx);
-
- // Fill all stale config.transactions entries
- auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now());
- auto cursor = client.query(
- NamespaceString::kSessionTransactionsTableNamespace.ns(), query, 0, 0, &kIdProjection);
-
- while (cursor->more()) {
- auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
- "TransactionSession"_sd, cursor->next());
-
- handler.handleLsid(transactionSession.get_id());
- }
-
- // Before the handler goes out of scope, flush its last batch to disk and collect stats.
- return handler.finalize();
- }
-
-private:
- std::shared_ptr<SessionsCollection> _collection;
-};
-
-/**
- * Removes the specified set of session ids from the persistent sessions collection and returns the
- * number of sessions actually removed.
- */
-int removeSessionsTransactionRecords(OperationContext* opCtx,
- SessionsCollection& sessionsCollection,
- const LogicalSessionIdSet& sessionIdsToRemove) {
- if (sessionIdsToRemove.empty()) {
- return 0;
- }
-
- // From the passed-in sessions, find the ones which are actually expired/removed
- auto expiredSessionIds =
- uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove));
-
- DBDirectClient client(opCtx);
- int numDeleted = 0;
-
- for (auto it = expiredSessionIds.begin(); it != expiredSessionIds.end();) {
- write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace);
- deleteOp.setWriteCommandBase([] {
- write_ops::WriteCommandBase base;
- base.setOrdered(false);
- return base;
- }());
- deleteOp.setDeletes([&] {
- // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object
- // size limit
- const int kMaxBatchSize = 10'000;
- std::vector<write_ops::DeleteOpEntry> entries;
- for (; it != expiredSessionIds.end() && entries.size() < kMaxBatchSize; ++it) {
- entries.emplace_back([&] {
- write_ops::DeleteOpEntry entry;
- entry.setQ(BSON(LogicalSessionRecord::kIdFieldName << it->toBSON()));
- entry.setMulti(false);
- return entry;
- }());
- }
-
- return entries;
- }());
-
- BSONObj result;
- client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
- deleteOp.toBSON({}),
- result);
-
- BatchedCommandResponse response;
- std::string errmsg;
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Failed to parse response " << result,
- response.parseBSON(result, &errmsg));
- uassertStatusOK(response.getTopLevelStatus());
-
- numDeleted += response.getN();
- }
-
- return numDeleted;
-}
-
-/**
- * The repl impl is simple, just pass along to the sessions collection for checking ids locally
- */
-class ReplHandler {
-public:
- ReplHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
- : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
-
- bool initialize() {
- return true;
- }
-
- void handleLsid(const LogicalSessionId& lsid) {
- _batch.insert(lsid);
-
- if (_batch.size() >= write_ops::kMaxWriteBatchSize) {
- _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch);
- _batch.clear();
- }
- }
-
- int finalize() {
- invariant(!_finalized);
- _finalized = true;
-
- _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch);
- return _numReaped;
- }
-
-private:
- OperationContext* const _opCtx;
- SessionsCollection& _sessionsCollection;
-
- LogicalSessionIdSet _batch;
-
- int _numReaped{0};
-
- bool _finalized{false};
-};
-
-/**
- * The sharding impl is a little fancier. Try to bucket by shard id, to avoid doing repeated small
- * scans.
- */
-class ShardedHandler {
-public:
- ShardedHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
- : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
-
- // Returns false if the sessions collection is not set up.
- bool initialize() {
- auto routingInfo =
- uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo(
- _opCtx, SessionsCollection::kSessionsNamespaceString));
- _cm = routingInfo.cm();
- return !!_cm;
- }
-
- void handleLsid(const LogicalSessionId& lsid) {
- invariant(_cm);
-
- // This code attempts to group requests to 'removeSessionsTransactionRecords' to contain
- // batches of lsids, which only fall on the same shard, so that the query to check whether
- // they are alive doesn't need to do cross-shard scatter/gather queries
- const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON());
- const auto& shardId = chunk.getShardId();
-
- auto& lsids = _shards[shardId];
- lsids.insert(lsid);
-
- if (lsids.size() >= write_ops::kMaxWriteBatchSize) {
- _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, lsids);
- _shards.erase(shardId);
- }
- }
-
- int finalize() {
- invariant(!_finalized);
- _finalized = true;
-
- for (const auto& pair : _shards) {
- _numReaped +=
- removeSessionsTransactionRecords(_opCtx, _sessionsCollection, pair.second);
- }
-
- return _numReaped;
- }
-
-private:
- OperationContext* const _opCtx;
- SessionsCollection& _sessionsCollection;
-
- std::shared_ptr<ChunkManager> _cm;
-
- stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards;
- int _numReaped{0};
-
- bool _finalized{false};
-};
-
-} // namespace
-
-std::unique_ptr<TransactionReaper> TransactionReaper::make(
- Type type, std::shared_ptr<SessionsCollection> collection) {
- switch (type) {
- case Type::kReplicaSet:
- return stdx::make_unique<TransactionReaperImpl<ReplHandler>>(std::move(collection));
- case Type::kSharded:
- return stdx::make_unique<TransactionReaperImpl<ShardedHandler>>(std::move(collection));
- }
- MONGO_UNREACHABLE;
-}
-
-TransactionReaper::~TransactionReaper() = default;
-
-} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.h b/src/mongo/db/transaction_reaper.h
deleted file mode 100644
index 2e53794efeb..00000000000
--- a/src/mongo/db/transaction_reaper.h
+++ /dev/null
@@ -1,64 +0,0 @@
-
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <memory>
-
-namespace mongo {
-
-class ServiceContext;
-class SessionsCollection;
-class OperationContext;
-
-/**
- * TransactionReaper is responsible for scanning the transaction table, checking if sessions are
- * still alive and deleting the transaction records if their sessions have expired.
- */
-class TransactionReaper {
-public:
- enum class Type {
- kReplicaSet,
- kSharded,
- };
-
- virtual ~TransactionReaper() = 0;
-
- virtual int reap(OperationContext* OperationContext) = 0;
-
- /**
- * The implementation of the sessions collections is different in replica sets versus sharded
- * clusters, so we have a factory to pick the right impl.
- */
- static std::unique_ptr<TransactionReaper> make(Type type,
- std::shared_ptr<SessionsCollection> collection);
-};
-
-} // namespace mongo
diff --git a/src/mongo/dbtests/query_stage_update.cpp b/src/mongo/dbtests/query_stage_update.cpp
index 4e8181bdb73..7d0632e53ce 100644
--- a/src/mongo/dbtests/query_stage_update.cpp
+++ b/src/mongo/dbtests/query_stage_update.cpp
@@ -210,9 +210,7 @@ public:
request.setQuery(query);
request.setUpdates(updates);
- const std::map<StringData, std::unique_ptr<ExpressionWithPlaceholder>> arrayFilters;
-
- ASSERT_OK(driver.parse(request.getUpdates(), arrayFilters, request.isMulti()));
+ ASSERT_OK(driver.parse(request.getUpdates(), {}, request.isMulti()));
// Setup update params.
UpdateStageParams params(&request, &driver, opDebug);
@@ -284,9 +282,7 @@ public:
request.setQuery(query);
request.setUpdates(updates);
- const std::map<StringData, std::unique_ptr<ExpressionWithPlaceholder>> arrayFilters;
-
- ASSERT_OK(driver.parse(request.getUpdates(), arrayFilters, request.isMulti()));
+ ASSERT_OK(driver.parse(request.getUpdates(), {}, request.isMulti()));
// Configure the scan.
CollectionScanParams collScanParams;
@@ -400,9 +396,7 @@ public:
request.setReturnDocs(UpdateRequest::RETURN_OLD);
request.setLifecycle(&updateLifecycle);
- const std::map<StringData, std::unique_ptr<ExpressionWithPlaceholder>> arrayFilters;
-
- ASSERT_OK(driver.parse(request.getUpdates(), arrayFilters, request.isMulti()));
+ ASSERT_OK(driver.parse(request.getUpdates(), {}, request.isMulti()));
// Configure a QueuedDataStage to pass the first object in the collection back in a
// RID_AND_OBJ state.
@@ -491,9 +485,7 @@ public:
request.setReturnDocs(UpdateRequest::RETURN_NEW);
request.setLifecycle(&updateLifecycle);
- const std::map<StringData, std::unique_ptr<ExpressionWithPlaceholder>> arrayFilters;
-
- ASSERT_OK(driver.parse(request.getUpdates(), arrayFilters, request.isMulti()));
+ ASSERT_OK(driver.parse(request.getUpdates(), {}, request.isMulti()));
// Configure a QueuedDataStage to pass the first object in the collection back in a
// RID_AND_OBJ state.
@@ -572,9 +564,7 @@ public:
request.setMulti(false);
request.setLifecycle(&updateLifecycle);
- const std::map<StringData, std::unique_ptr<ExpressionWithPlaceholder>> arrayFilters;
-
- ASSERT_OK(driver.parse(request.getUpdates(), arrayFilters, request.isMulti()));
+ ASSERT_OK(driver.parse(request.getUpdates(), {}, request.isMulti()));
// Configure a QueuedDataStage to pass an OWNED_OBJ to the update stage.
auto qds = make_unique<QueuedDataStage>(&_opCtx, ws.get());
diff --git a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
index 7f68667c466..e6159a3ec28 100644
--- a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
+++ b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
@@ -32,8 +32,6 @@
#include "mongo/platform/basic.h"
-#include <memory>
-
#include "mongo/embedded/logical_session_cache_factory_embedded.h"
#include "mongo/db/logical_session_cache_impl.h"
@@ -47,11 +45,14 @@ namespace mongo {
std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheEmbedded() {
auto liaison = std::make_unique<ServiceLiaisonMongod>();
- // Set up the logical session cache
auto sessionsColl = std::make_shared<SessionsCollectionStandalone>();
return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), nullptr /* reaper */);
+ std::move(liaison),
+ std::move(sessionsColl),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ });
}
} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index e18c7652c28..353b0887dfc 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -77,7 +77,6 @@ env.Library(
'client/sharding_network_connection_hook.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/db/logical_session_cache_factory_mongos',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
'$BUILD_DIR/mongo/executor/network_interface_factory',
'$BUILD_DIR/mongo/executor/network_interface_thread_pool',
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index ce517052716..b4d5f97dd46 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -56,13 +56,15 @@
#include "mongo/db/lasterror.h"
#include "mongo/db/log_process_details.h"
#include "mongo/db/logical_clock.h"
-#include "mongo/db/logical_session_cache_factory_mongos.h"
+#include "mongo/db/logical_session_cache_impl.h"
#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/service_liaison_mongos.h"
#include "mongo/db/session_killer.h"
+#include "mongo/db/sessions_collection_sharded.h"
#include "mongo/db/startup_warnings_common.h"
#include "mongo/db/wire_version.h"
#include "mongo/executor/task_executor_pool.h"
@@ -85,7 +87,6 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/service_entry_point_mongos.h"
#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
-#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
#include "mongo/s/sharding_initialization.h"
#include "mongo/s/sharding_uptime_reporter.h"
#include "mongo/s/version_mongos.h"
@@ -182,6 +183,9 @@ void cleanupTask(ServiceContext* serviceContext) {
{
Client::initThreadIfNotAlready();
Client& client = cc();
+
+ LogicalSessionCache::get(serviceContext)->joinOnShutDown();
+
ServiceContext::UniqueOperationContext uniqueTxn;
OperationContext* opCtx = client.getOperationContext();
if (!opCtx) {
@@ -434,8 +438,13 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
SessionKiller::set(serviceContext,
std::make_shared<SessionKiller>(serviceContext, killSessionsRemote));
- // Set up the logical session cache
- LogicalSessionCache::set(serviceContext, makeLogicalSessionCacheS());
+ LogicalSessionCache::set(serviceContext,
+ stdx::make_unique<LogicalSessionCacheImpl>(
+ stdx::make_unique<ServiceLiaisonMongos>(),
+ stdx::make_unique<SessionsCollectionSharded>(),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ }));
status = serviceContext->getServiceExecutor()->start();
if (!status.isOK()) {