summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-03 16:21:24 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-09 07:40:56 -0400
commit2791817876636c0cfd60d867f31c7a83cf3f18c1 (patch)
tree3aefcb1999cccf4cb53b2401a44857549ba8722a
parent1b8a9f5dc5c3314042b55e7415a2a25045b32a94 (diff)
downloadmongo-2791817876636c0cfd60d867f31c7a83cf3f18c1.tar.gz
SERVER-37837 Get rid of TransactionReaper (Part 1)
This change gets rid of the TransactionReaper's usage of the ReplicationCoordinator for checking whether it is primary or not and makes the LogicalSessionCache joinable on shutdown. It also removes the TransactionReaper's grouping per-shard optimization and moves it all under SessionCollectionSharded.
-rw-r--r--src/mongo/SConscript24
-rw-r--r--src/mongo/db/SConscript27
-rw-r--r--src/mongo/db/commands/SConscript3
-rw-r--r--src/mongo/db/commands/end_sessions_command.cpp12
-rw-r--r--src/mongo/db/db.cpp14
-rw-r--r--src/mongo/db/logical_session_cache.h7
-rw-r--r--src/mongo/db/logical_session_cache.idl (renamed from src/mongo/db/logical_session_cache_impl.idl)23
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongod.cpp24
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongos.cpp51
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp110
-rw-r--r--src/mongo/db/logical_session_cache_impl.h37
-rw-r--r--src/mongo/db/logical_session_cache_noop.h2
-rw-r--r--src/mongo/db/logical_session_cache_test.cpp6
-rw-r--r--src/mongo/db/logical_session_id_test.cpp6
-rw-r--r--src/mongo/db/s/SConscript37
-rw-r--r--src/mongo/db/sessions_collection.cpp11
-rw-r--r--src/mongo/db/sessions_collection.h21
-rw-r--r--src/mongo/db/sessions_collection_rs.cpp45
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp68
-rw-r--r--src/mongo/db/sessions_collection_sharded.h14
-rw-r--r--src/mongo/db/sessions_collection_standalone.cpp10
-rw-r--r--src/mongo/db/transaction_reaper.cpp289
-rw-r--r--src/mongo/db/transaction_reaper.h63
-rw-r--r--src/mongo/db/transaction_reaper.idl52
-rw-r--r--src/mongo/db/transaction_reaper_d.cpp133
-rw-r--r--src/mongo/db/transaction_reaper_d.h (renamed from src/mongo/db/logical_session_cache_factory_mongos.h)17
-rw-r--r--src/mongo/db/transaction_reaper_d_test.cpp (renamed from src/mongo/db/transaction_reaper_test.cpp)20
-rw-r--r--src/mongo/embedded/logical_session_cache_factory_embedded.cpp9
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/server.cpp17
30 files changed, 458 insertions, 695 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index a070a881f0a..871429b589d 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -328,14 +328,14 @@ mongod = env.Program(
'db/catalog/index_key_validate',
'db/cloner',
'db/collection_index_usage_tracker',
- 'db/commands/mongod',
'db/commands/mongod_fcv',
+ 'db/commands/mongod',
'db/commands/server_status_servers',
'db/common',
'db/concurrency/lock_manager',
'db/concurrency/write_conflict_exception',
- 'db/curop',
'db/curop_metrics',
+ 'db/curop',
'db/db_raii',
'db/dbdirectclient',
'db/dbhelpers',
@@ -343,9 +343,9 @@ mongod = env.Program(
'db/free_mon/free_mon_mongod',
'db/ftdc/ftdc_mongod',
'db/fts/ftsmongod',
+ 'db/index_builds_coordinator_mongod',
'db/index/index_access_method',
'db/index/index_descriptor',
- 'db/index_builds_coordinator_mongod',
'db/initialize_snmp',
'db/introspect',
'db/keys_collection_client_direct',
@@ -375,13 +375,13 @@ mongod = env.Program(
'db/repl/rs_rollback',
'db/repl/rslog',
'db/repl/serveronly_repl',
- 'db/repl/storage_interface',
'db/repl/storage_interface_impl',
+ 'db/repl/storage_interface',
'db/repl/topology_coordinator',
'db/rw_concern_d',
'db/s/balancer',
- 'db/s/commands_db_s',
'db/s/op_observer_sharding_impl',
+ 'db/s/sharding_commands_d',
'db/s/sharding_runtime_d',
'db/service_context_d',
'db/startup_warnings_mongod',
@@ -392,8 +392,8 @@ mongod = env.Program(
'db/storage/biggie/storage_biggie',
'db/storage/devnull/storage_devnull',
'db/storage/ephemeral_for_test/storage_ephemeral_for_test',
- 'db/storage/flow_control',
'db/storage/flow_control_parameters',
+ 'db/storage/flow_control',
'db/storage/storage_engine_lock_file',
'db/storage/storage_engine_metadata',
'db/storage/storage_init_d',
@@ -403,8 +403,8 @@ mongod = env.Program(
'db/traffic_recorder',
'db/ttl_collection_cache',
'db/ttl_d',
- 'db/update/update_driver',
'db/update_index_data',
+ 'db/update/update_driver',
'db/views/views_mongod',
'db/windows_options' if env.TargetOSIs('windows') else [],
'executor/network_interface_factory',
@@ -490,15 +490,19 @@ mongos = env.Program(
LIBDEPS=[
'db/audit',
'db/auth/authmongos',
- 'db/commands/server_status',
'db/commands/server_status_core',
'db/commands/server_status_servers',
+ 'db/commands/server_status',
'db/curop',
'db/ftdc/ftdc_mongos',
+ 'db/logical_session_cache_impl',
+ 'db/logical_session_cache',
'db/logical_time_metadata_hook',
'db/mongodandmongos',
- 'db/server_options',
'db/server_options_base',
+ 'db/server_options',
+ 'db/service_liaison_mongos',
+ 'db/sessions_collection_sharded',
'db/startup_warnings_common',
'db/stats/counters',
'db/windows_options' if env.TargetOSIs('windows') else [],
@@ -507,10 +511,10 @@ mongos = env.Program(
's/committed_optime_metadata_hook',
's/coreshard',
's/is_mongos',
+ 's/query/cluster_cursor_cleanup_job',
's/sharding_egress_metadata_hook_for_mongos',
's/sharding_initialization',
's/sharding_router_api',
- 's/query/cluster_cursor_cleanup_job',
'transport/message_compressor_options_server',
'transport/service_entry_point',
'transport/transport_layer_manager',
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 9f4088dba01..93928f28e42 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1501,6 +1501,7 @@ env.Library(
source=[
'logical_session_cache.cpp',
env.Idlc('logical_session_cache_stats.idl')[0],
+ env.Idlc('logical_session_cache.idl')[0],
],
LIBDEPS=[
'logical_session_id',
@@ -1517,8 +1518,6 @@ env.Library(
'initialize_operation_session_info.cpp',
'logical_session_cache_impl.cpp',
'logical_session_server_status_section.cpp',
- env.Idlc('logical_session_cache_impl.idl')[0],
- env.Idlc('commands/end_sessions.idl')[0],
],
LIBDEPS=[
'logical_session_cache',
@@ -1546,10 +1545,9 @@ env.Library(
)
env.Library(
- target='transaction_reaper',
+ target='transaction_reaper_d',
source=[
- 'transaction_reaper.cpp',
- env.Idlc('transaction_reaper.idl')[0],
+ 'transaction_reaper_d.cpp',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
@@ -1567,7 +1565,7 @@ envWithAsio.CppUnitTest(
target='logical_session_cache_test',
source=[
'logical_session_cache_test.cpp',
- 'transaction_reaper_test.cpp',
+ 'transaction_reaper_d_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/executor/async_timer_mock',
@@ -1584,7 +1582,7 @@ envWithAsio.CppUnitTest(
'service_context_d_test_fixture',
'service_liaison_mock',
'sessions_collection_mock',
- 'transaction_reaper',
+ 'transaction_reaper_d',
'transaction',
],
)
@@ -1604,20 +1602,7 @@ envWithAsio.Library(
'sessions_collection_standalone',
],
LIBDEPS_PRIVATE=[
- 'transaction_reaper',
- ],
-)
-
-envWithAsio.Library(
- target='logical_session_cache_factory_mongos',
- source=[
- 'logical_session_cache_factory_mongos.cpp',
- ],
- LIBDEPS=[
- 'logical_session_cache',
- 'logical_session_cache_impl',
- 'service_liaison_mongos',
- 'sessions_collection_sharded',
+ 'transaction_reaper_d',
],
)
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 7f83d0d192d..8c7fc3639ad 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -102,11 +102,12 @@ env.Library(
"kill_all_sessions_command.cpp",
"kill_sessions_command.cpp",
"parameters.cpp",
- env.Idlc('parameters.idl')[0],
"refresh_logical_session_cache_now.cpp",
"refresh_sessions_command.cpp",
"rename_collection_common.cpp",
"start_session_command.cpp",
+ env.Idlc('end_sessions.idl')[0],
+ env.Idlc('parameters.idl')[0],
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/bson/mutable/mutable_bson',
diff --git a/src/mongo/db/commands/end_sessions_command.cpp b/src/mongo/db/commands/end_sessions_command.cpp
index 9822eebd60c..b8325015cf1 100644
--- a/src/mongo/db/commands/end_sessions_command.cpp
+++ b/src/mongo/db/commands/end_sessions_command.cpp
@@ -33,11 +33,13 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/end_sessions_gen.h"
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
namespace mongo {
+namespace {
class EndSessionsCommand final : public BasicCommand {
EndSessionsCommand(const EndSessionsCommand&) = delete;
@@ -73,10 +75,10 @@ public:
}
}
- virtual bool run(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
+ bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
auto lsCache = LogicalSessionCache::get(opCtx);
auto cmd = EndSessionsCmdFromClient::parse("EndSessionsCmdFromClient"_sd, cmdObj);
@@ -84,6 +86,8 @@ public:
lsCache->endSessions(makeLogicalSessionIds(cmd.getEndSessions(), opCtx));
return true;
}
+
} endSessionsCommand;
+} // namespace
} // namespace mongo
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 94ec16df197..31477531f64 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -511,7 +511,11 @@ ExitCode _initAndListen(int listenPort) {
auto shardingInitialized = ShardingInitializationMongoD::get(startupOpCtx.get())
->initializeShardingAwarenessIfNeeded(startupOpCtx.get());
if (shardingInitialized) {
- waitForShardRegistryReload(startupOpCtx.get()).transitional_ignore();
+ auto status = waitForShardRegistryReload(startupOpCtx.get());
+ if (!status.isOK()) {
+ LOG(0) << "Failed to load the shard registry as part of startup"
+ << causedBy(redact(status));
+ }
}
auto storageEngine = serviceContext->getStorageEngine();
@@ -622,8 +626,7 @@ ExitCode _initAndListen(int listenPort) {
kind = LogicalSessionCacheServer::kReplicaSet;
}
- auto sessionCache = makeLogicalSessionCacheD(kind);
- LogicalSessionCache::set(serviceContext, std::move(sessionCache));
+ LogicalSessionCache::set(serviceContext, makeLogicalSessionCacheD(kind));
// MessageServer::run will return when exit code closes its socket and we don't need the
// operation context anymore
@@ -903,6 +906,11 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
balancer->waitForBalancerToStop();
}
+ // Join the logical session cache before the transport layer.
+ if (auto lsc = LogicalSessionCache::get(serviceContext)) {
+ lsc->joinOnShutDown();
+ }
+
// Shutdown the TransportLayer so that new connections aren't accepted
if (auto tl = serviceContext->getTransportLayer()) {
log(LogComponent::kNetwork) << "shutdown: going to close listening sockets...";
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index 703ec97a337..e035ee13d7f 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -32,7 +32,7 @@
#include <boost/optional.hpp>
#include "mongo/base/status.h"
-#include "mongo/db/commands/end_sessions_gen.h"
+#include "mongo/db/logical_session_cache_gen.h"
#include "mongo/db/logical_session_cache_stats_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/refresh_sessions_gen.h"
@@ -58,6 +58,11 @@ public:
virtual ~LogicalSessionCache() = 0;
/**
+ * Invoked on service shutdown time in order to join the cache's refresher and reaper tasks.
+ */
+ virtual void joinOnShutDown() = 0;
+
+ /**
* If the cache contains a record for this LogicalSessionId, promotes that lsid
* to be the most recently used and updates its lastUse date to be the current
* time. Returns an error if the session was not found.
diff --git a/src/mongo/db/logical_session_cache_impl.idl b/src/mongo/db/logical_session_cache.idl
index 71cd63e1ed3..3294df14378 100644
--- a/src/mongo/db/logical_session_cache_impl.idl
+++ b/src/mongo/db/logical_session_cache.idl
@@ -1,4 +1,4 @@
-# Copyright (C) 2018-present MongoDB, Inc.
+# Copyright (C) 2019-present MongoDB, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Server Side Public License, version 1,
@@ -52,3 +52,24 @@ server_parameters:
cpp_vartype: bool
cpp_varname: disableLogicalSessionCacheRefresh
default: false
+
+ TransactionRecordMinimumLifetimeMinutes:
+ # The minimum lifetime for a transaction record is how long it has to have lived on the server
+ # before we'll consider it for cleanup. This is effectively the window for how long it is
+ # permissible for a mongos to hang before we're willing to accept a failure of the retryable
+ # writes subsystem.
+ #
+ # Specifically, we imagine that a client connects to one mongos on a session and performs a
+ # retryable write after which that mongos hangs. Then the client connects to a new mongos on the
+ # same session and successfully executes its write. After a day passes, the session will time
+ # out, cleaning up the retryable write. Then the original mongos wakes up, vivifies the session
+ # and executes the write (because all records of the session + transaction have been deleted).
+ #
+ # So the write is performed twice, which is unavoidable without losing session vivification
+ # and/or requiring synchronized clocks all the way out to the client. In lieu of that we
+ # provide a weaker guarantee after the minimum transaction lifetime.
+ description: The minimum lifetime for a transaction record.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: gTransactionRecordMinimumLifetimeMinutes
+ default: 30
diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp
index 6610702c1ca..2d9870e258a 100644
--- a/src/mongo/db/logical_session_cache_factory_mongod.cpp
+++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp
@@ -31,8 +31,6 @@
#include "mongo/platform/basic.h"
-#include <memory>
-
#include "mongo/db/logical_session_cache_factory_mongod.h"
#include "mongo/db/logical_session_cache_impl.h"
@@ -41,14 +39,14 @@
#include "mongo/db/sessions_collection_rs.h"
#include "mongo/db/sessions_collection_sharded.h"
#include "mongo/db/sessions_collection_standalone.h"
-#include "mongo/db/transaction_reaper.h"
+#include "mongo/db/transaction_reaper_d.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
namespace mongo {
std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state) {
- auto liaison = stdx::make_unique<ServiceLiaisonMongod>();
+ auto liaison = std::make_unique<ServiceLiaisonMongod>();
auto sessionsColl = [&]() -> std::shared_ptr<SessionsCollection> {
switch (state) {
@@ -65,22 +63,8 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCach
MONGO_UNREACHABLE;
}();
- auto reaper = [&]() -> std::shared_ptr<TransactionReaper> {
- switch (state) {
- case LogicalSessionCacheServer::kSharded:
- return TransactionReaper::make(TransactionReaper::Type::kSharded, sessionsColl);
- case LogicalSessionCacheServer::kReplicaSet:
- return TransactionReaper::make(TransactionReaper::Type::kReplicaSet, sessionsColl);
- case LogicalSessionCacheServer::kConfigServer:
- case LogicalSessionCacheServer::kStandalone:
- return nullptr;
- }
-
- MONGO_UNREACHABLE;
- }();
-
- return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), std::move(reaper));
+ return std::make_unique<LogicalSessionCacheImpl>(
+ std::move(liaison), std::move(sessionsColl), TransactionReaperD::reapSessionsOlderThan);
}
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_factory_mongos.cpp b/src/mongo/db/logical_session_cache_factory_mongos.cpp
deleted file mode 100644
index 2f8942c8d85..00000000000
--- a/src/mongo/db/logical_session_cache_factory_mongos.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-
-#include "mongo/db/logical_session_cache_factory_mongos.h"
-
-#include "mongo/db/logical_session_cache_impl.h"
-#include "mongo/db/service_liaison_mongos.h"
-#include "mongo/db/sessions_collection_sharded.h"
-#include "mongo/stdx/memory.h"
-
-namespace mongo {
-
-std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS() {
- auto liaison = stdx::make_unique<ServiceLiaisonMongos>();
- auto sessionsColl = stdx::make_unique<SessionsCollectionSharded>();
-
- return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), nullptr /* reaper */);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index 24d278c9150..2095595eb5f 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/logical_session_cache_impl.h"
-#include "mongo/db/logical_session_cache_impl_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
@@ -58,13 +57,12 @@ void clearShardingOperationFailedStatus(OperationContext* opCtx) {
} // namespace
-LogicalSessionCacheImpl::LogicalSessionCacheImpl(
- std::unique_ptr<ServiceLiaison> service,
- std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper)
+LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
+ std::shared_ptr<SessionsCollection> collection,
+ ReapSessionsOlderThanFn reapSessionsOlderThanFn)
: _service(std::move(service)),
_sessionsColl(std::move(collection)),
- _transactionReaper(std::move(transactionReaper)) {
+ _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) {
_stats.setLastSessionsCollectionJobTimestamp(now());
_stats.setLastTransactionReaperJobTimestamp(now());
@@ -73,26 +71,22 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
[this](Client* client) { _periodicRefresh(client); },
Milliseconds(logicalSessionRefreshMillis)});
- if (_transactionReaper) {
- _service->scheduleJob({"LogicalSessionCacheReap",
- [this](Client* client) { _periodicReap(client); },
- Milliseconds(logicalSessionRefreshMillis)});
- }
+ _service->scheduleJob({"LogicalSessionCacheReap",
+ [this](Client* client) { _periodicReap(client); },
+ Milliseconds(logicalSessionRefreshMillis)});
}
}
LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
- try {
- _service->join();
- } catch (...) {
- // If we failed to join we might still be running a background thread, log but swallow the
- // error since there is no good way to recover
- severe() << "Failed to join background service thread";
- }
+ joinOnShutDown();
+}
+
+void LogicalSessionCacheImpl::joinOnShutDown() {
+ _service->join();
}
Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto it = _activeSessions.find(lsid);
if (it == _activeSessions.end()) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
@@ -167,7 +161,7 @@ Date_t LogicalSessionCacheImpl::now() {
}
size_t LogicalSessionCacheImpl::size() {
- stdx::lock_guard<stdx::mutex> lock(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
return _activeSessions.size();
}
@@ -189,13 +183,9 @@ void LogicalSessionCacheImpl::_periodicReap(Client* client) {
}
Status LogicalSessionCacheImpl::_reap(Client* client) {
- if (!_transactionReaper) {
- return Status::OK();
- }
-
// Take the lock to update some stats.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the last set of stats for our new run.
_stats.setLastTransactionReaperJobDurationMillis(0);
@@ -206,19 +196,19 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
_stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1);
}
- int numReaped = 0;
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
- try {
- boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
- auto* const opCtx = [&client, &uniqueCtx] {
- if (client->getOperationContext()) {
- return client->getOperationContext();
- }
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
- uniqueCtx.emplace(client->makeOperationContext());
- return uniqueCtx->get();
- }();
+ int numReaped = 0;
+ try {
ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); });
auto existsStatus = _sessionsColl->checkSessionsCollectionExists(opCtx);
@@ -236,20 +226,23 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return Status::OK();
}
- stdx::lock_guard<stdx::mutex> lk(_reaperMutex);
- numReaped = _transactionReaper->reap(opCtx);
- } catch (...) {
+ numReaped =
+ _reapSessionsOlderThanFn(opCtx,
+ *_sessionsColl,
+ opCtx->getServiceContext()->getFastClockSource()->now() -
+ Minutes(gTransactionRecordMinimumLifetimeMinutes));
+ } catch (const DBException& ex) {
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
}
- return exceptionToStatus();
+ return ex.toStatus();
}
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
_stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped);
@@ -261,7 +254,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
void LogicalSessionCacheImpl::_refresh(Client* client) {
// Stats for serverStatus:
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the refresh-related stats with the beginning of our run.
_stats.setLastSessionsCollectionJobDurationMillis(0);
@@ -276,7 +269,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// This will finish timing _refresh for our stats no matter when we return.
const auto timeRefreshJob = makeGuard([this] {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp();
_stats.setLastSessionsCollectionJobDurationMillis(millis.count());
});
@@ -308,7 +301,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
{
using std::swap;
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
swap(explicitlyEndingSessions, _endingSessions);
swap(activeSessions, _activeSessions);
}
@@ -317,7 +310,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// swapped out of LogicalSessionCache, and merges in any records that had been added since we
// swapped them out.
auto backSwap = [this](auto& member, auto& temp) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
using std::swap;
swap(member, temp);
for (const auto& it : temp) {
@@ -333,9 +326,8 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
activeSessions.erase(lsid);
}
- // refresh all recently active sessions as well as for sessions attached to running ops
-
- LogicalSessionRecordSet activeSessionRecords{};
+ // Refresh all recently active sessions as well as for sessions attached to running ops
+ LogicalSessionRecordSet activeSessionRecords;
auto runningOpSessions = _service->getActiveOpSessions();
@@ -354,7 +346,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords));
activeSessionsBackSwapper.dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size());
}
@@ -362,7 +354,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions));
explicitlyEndingBackSwaper.dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size());
}
@@ -375,7 +367,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// Exclude sessions added to _activeSessions from the openCursorSession to avoid race between
// killing cursors on the removed sessions and creating sessions.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
for (const auto& it : _activeSessions) {
auto newSessionIt = openCursorSessions.find(it.first);
@@ -405,33 +397,34 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
SessionKiller::Matcher matcher(std::move(patterns));
auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher));
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobCursorsClosed(killRes.second);
}
}
void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_endingSessions.insert(begin(sessions), end(sessions));
}
LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setActiveSessionsCount(_activeSessions.size());
return _stats;
}
Status LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeSessions.size() >= static_cast<size_t>(maxSessions)) {
return {ErrorCodes::TooManyLogicalSessions, "cannot add session into the cache"};
}
+
_activeSessions.insert(std::make_pair(record.getId(), record));
return Status::OK();
}
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
ret.reserve(_activeSessions.size());
for (const auto& id : _activeSessions) {
@@ -442,7 +435,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
const std::vector<SHA256Block>& userDigests) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
for (const auto& it : _activeSessions) {
if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) !=
@@ -455,11 +448,12 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached(
const LogicalSessionId& id) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto it = _activeSessions.find(id);
if (it == _activeSessions.end()) {
return boost::none;
}
return it->second;
}
+
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h
index b8c4001ace6..96c7f80d717 100644
--- a/src/mongo/db/logical_session_cache_impl.h
+++ b/src/mongo/db/logical_session_cache_impl.h
@@ -30,23 +30,12 @@
#pragma once
#include "mongo/db/logical_session_cache.h"
-#include "mongo/db/logical_session_cache_impl_gen.h"
-#include "mongo/db/logical_session_id.h"
-#include "mongo/db/refresh_sessions_gen.h"
#include "mongo/db/service_liaison.h"
#include "mongo/db/sessions_collection.h"
-#include "mongo/db/time_proof_service.h"
-#include "mongo/db/transaction_reaper.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/lru_cache.h"
+#include "mongo/util/functional.h"
namespace mongo {
-class Client;
-class OperationContext;
-class ServiceContext;
-
/**
* A thread-safe cache structure for logical session records.
*
@@ -63,18 +52,20 @@ class ServiceContext;
*/
class LogicalSessionCacheImpl final : public LogicalSessionCache {
public:
- /**
- * Construct a new session cache.
- */
+ using ReapSessionsOlderThanFn =
+ unique_function<int(OperationContext*, SessionsCollection&, Date_t)>;
+
LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper);
+ ReapSessionsOlderThanFn reapSessionsOlderThanFn);
LogicalSessionCacheImpl(const LogicalSessionCacheImpl&) = delete;
LogicalSessionCacheImpl& operator=(const LogicalSessionCacheImpl&) = delete;
~LogicalSessionCacheImpl();
+ void joinOnShutDown() override;
+
Status promote(LogicalSessionId lsid) override;
Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override;
@@ -126,23 +117,19 @@ private:
*/
Status _addToCache(LogicalSessionRecord record);
- // This value is only modified under the lock, and is modified
- // automatically by the background jobs.
- LogicalSessionCacheStats _stats;
-
std::unique_ptr<ServiceLiaison> _service;
std::shared_ptr<SessionsCollection> _sessionsColl;
+ ReapSessionsOlderThanFn _reapSessionsOlderThanFn;
- mutable stdx::mutex _reaperMutex;
- std::shared_ptr<TransactionReaper> _transactionReaper;
-
- mutable stdx::mutex _cacheMutex;
+ mutable stdx::mutex _mutex;
LogicalSessionIdMap<LogicalSessionRecord> _activeSessions;
LogicalSessionIdSet _endingSessions;
- Date_t lastRefreshTime;
+ Date_t _lastRefreshTime;
+
+ LogicalSessionCacheStats _stats;
};
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h
index a81f6ee8299..8b5bb312102 100644
--- a/src/mongo/db/logical_session_cache_noop.h
+++ b/src/mongo/db/logical_session_cache_noop.h
@@ -42,6 +42,8 @@ class ServiceContext;
*/
class LogicalSessionCacheNoop : public LogicalSessionCache {
public:
+ void joinOnShutDown() override {}
+
Status promote(LogicalSessionId lsid) override {
return Status::OK();
}
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp
index e7b61b664b0..a5cc6e8eed0 100644
--- a/src/mongo/db/logical_session_cache_test.cpp
+++ b/src/mongo/db/logical_session_cache_test.cpp
@@ -79,7 +79,11 @@ public:
auto mockService = stdx::make_unique<MockServiceLiaison>(_service);
auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions);
_cache = stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(mockService), std::move(mockSessions), nullptr /* reaper */);
+ std::move(mockService),
+ std::move(mockSessions),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ });
}
void waitUntilRefreshScheduled() {
diff --git a/src/mongo/db/logical_session_id_test.cpp b/src/mongo/db/logical_session_id_test.cpp
index d994458332f..160e718201d 100644
--- a/src/mongo/db/logical_session_id_test.cpp
+++ b/src/mongo/db/logical_session_id_test.cpp
@@ -92,7 +92,11 @@ public:
std::make_shared<MockSessionsCollectionImpl>());
auto localLogicalSessionCache = std::make_unique<LogicalSessionCacheImpl>(
- std::move(localServiceLiaison), std::move(localSessionsCollection), nullptr);
+ std::move(localServiceLiaison),
+ std::move(localSessionsCollection),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ });
LogicalSessionCache::set(getServiceContext(), std::move(localLogicalSessionCache));
}
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 0fc1b35ee08..54050af3feb 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -278,7 +278,7 @@ env.Library(
)
env.Library(
- target='commands_db_s',
+ target='sharding_commands_d',
source=[
'add_shard_cmd.cpp',
'check_sharding_index_command.cpp',
@@ -372,20 +372,25 @@ env.CppUnitTest(
'migration_chunk_cloner_source_legacy_test.cpp',
'migration_destination_manager_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
+ 'session_catalog_migration_destination_test.cpp',
+ 'session_catalog_migration_source_test.cpp',
'shard_metadata_util_test.cpp',
'shard_server_catalog_cache_loader_test.cpp',
'sharding_initialization_mongod_test.cpp',
'sharding_initialization_op_observer_test.cpp',
'split_vector_test.cpp',
],
- LIBDEPS=[
- 'sharding_runtime_d',
+ LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/logical_session_cache_impl',
+ '$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/query/query_request',
+ '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
'$BUILD_DIR/mongo/s/shard_server_test_fixture',
+ 'sharding_runtime_d',
],
)
@@ -427,32 +432,6 @@ env.CppUnitTest(
)
env.CppUnitTest(
- target='session_catalog_migration_source_test',
- source=[
- 'session_catalog_migration_source_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
- 'sharding_runtime_d',
- ]
-)
-
-env.CppUnitTest(
- target='session_catalog_migration_destination_test',
- source=[
- 'session_catalog_migration_destination_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/ops/write_ops_exec',
- '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
- '$BUILD_DIR/mongo/s/shard_server_test_fixture',
- 'sharding_runtime_d',
- ]
-)
-
-env.CppUnitTest(
target='sharding_catalog_manager_test',
source=[
'config/initial_split_policy_test.cpp',
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index fda30fb70f3..7ffc762201e 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -212,7 +212,7 @@ SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const N
}
Status SessionsCollection::doRefresh(const NamespaceString& ns,
- const LogicalSessionRecordSet& sessions,
+ const std::vector<LogicalSessionRecord>& sessions,
SendBatchFn send) {
auto init = [ns](BSONObjBuilder* batch) {
batch->append("update", ns.coll());
@@ -230,7 +230,7 @@ Status SessionsCollection::doRefresh(const NamespaceString& ns,
}
Status SessionsCollection::doRemove(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
+ const std::vector<LogicalSessionId>& sessions,
SendBatchFn send) {
auto init = [ns](BSONObjBuilder* batch) {
batch->append("delete", ns.coll());
@@ -245,16 +245,15 @@ Status SessionsCollection::doRemove(const NamespaceString& ns,
return runBulkCmd("deletes", init, add, send, sessions);
}
-StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
- FindBatchFn send) {
+StatusWith<LogicalSessionIdSet> SessionsCollection::doFindRemoved(
+ const NamespaceString& ns, const std::vector<LogicalSessionId>& sessions, FindBatchFn send) {
auto makeT = [] { return std::vector<LogicalSessionId>{}; };
auto add = [](std::vector<LogicalSessionId>& batch, const LogicalSessionId& record) {
batch.push_back(record);
};
- LogicalSessionIdSet removed = sessions;
+ LogicalSessionIdSet removed{sessions.begin(), sessions.end()};
auto wrappedSend = [&](BSONObj batch) {
auto swBatchResult = send(batch);
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index b8bdfe83ed2..64e78c8a476 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -102,31 +102,34 @@ protected:
* Makes a send function for the given client.
*/
using SendBatchFn = stdx::function<Status(BSONObj batch)>;
- SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client);
- SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client);
+ static SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client);
+ static SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client);
+
using FindBatchFn = stdx::function<StatusWith<BSONObj>(BSONObj batch)>;
- FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client);
+ static FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client);
/**
* Formats and sends batches of refreshes for the given set of sessions.
*/
Status doRefresh(const NamespaceString& ns,
- const LogicalSessionRecordSet& sessions,
+ const std::vector<LogicalSessionRecord>& sessions,
SendBatchFn send);
/**
* Formats and sends batches of deletes for the given set of sessions.
*/
Status doRemove(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
+ const std::vector<LogicalSessionId>& sessions,
SendBatchFn send);
/**
- * Formats and sends batches of fetches for the given set of sessions.
+ * Returns those lsids from the input 'sessions' array which are not present in the sessions
+ * collection (essentially performs an inner join of 'sessions' against the sessions
+ * collection).
*/
- StatusWith<LogicalSessionIdSet> doFetch(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
- FindBatchFn send);
+ StatusWith<LogicalSessionIdSet> doFindRemoved(const NamespaceString& ns,
+ const std::vector<LogicalSessionId>& sessions,
+ FindBatchFn send);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp
index 7abc9dfd1ae..0069d3372f3 100644
--- a/src/mongo/db/sessions_collection_rs.cpp
+++ b/src/mongo/db/sessions_collection_rs.cpp
@@ -207,18 +207,20 @@ Status SessionsCollectionRS::checkSessionsCollectionExists(OperationContext* opC
Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions) {
+ const std::vector<LogicalSessionRecord> sessionsVector(sessions.begin(), sessions.end());
+
return dispatch(NamespaceString::kLogicalSessionsNamespace,
opCtx,
[&] {
DBDirectClient client(opCtx);
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ sessionsVector,
makeSendFnForBatchWrite(
NamespaceString::kLogicalSessionsNamespace, &client));
},
[&](DBClientBase* client) {
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ sessionsVector,
makeSendFnForBatchWrite(
NamespaceString::kLogicalSessionsNamespace, client));
});
@@ -226,18 +228,20 @@ Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
const LogicalSessionIdSet& sessions) {
+ const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
+
return dispatch(NamespaceString::kLogicalSessionsNamespace,
opCtx,
[&] {
DBDirectClient client(opCtx);
return doRemove(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ sessionsVector,
makeSendFnForBatchWrite(
NamespaceString::kLogicalSessionsNamespace, &client));
},
[&](DBClientBase* client) {
return doRemove(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ sessionsVector,
makeSendFnForBatchWrite(
NamespaceString::kLogicalSessionsNamespace, client));
});
@@ -245,21 +249,24 @@ Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
- return dispatch(NamespaceString::kLogicalSessionsNamespace,
- opCtx,
- [&] {
- DBDirectClient client(opCtx);
- return doFetch(NamespaceString::kLogicalSessionsNamespace,
- sessions,
- makeFindFnForCommand(
- NamespaceString::kLogicalSessionsNamespace, &client));
- },
- [&](DBClientBase* client) {
- return doFetch(NamespaceString::kLogicalSessionsNamespace,
- sessions,
- makeFindFnForCommand(
- NamespaceString::kLogicalSessionsNamespace, client));
- });
+ const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
+
+ return dispatch(
+ NamespaceString::kLogicalSessionsNamespace,
+ opCtx,
+ [&] {
+ DBDirectClient client(opCtx);
+ return doFindRemoved(
+ NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client));
+ },
+ [&](DBClientBase* client) {
+ return doFindRemoved(
+ NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, client));
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index cf1be44431c..e65ddd0dab5 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -40,6 +40,8 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/write_ops/batch_write_exec.h"
@@ -78,6 +80,60 @@ Status SessionsCollectionSharded::_checkCacheForSessionsCollection(OperationCont
return {ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist"};
}
+std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionId> sessionIdsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionIdsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionId> sessionIdsGroupedByShard;
+ sessionIdsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionIdsByOwningShard) {
+ sessionIdsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionIdsGroupedByShard;
+}
+
+std::vector<LogicalSessionRecord> SessionsCollectionSharded::_groupSessionRecordsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionRecordSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionRecord> sessionsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionRecord> sessionRecordsGroupedByShard;
+ sessionRecordsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionsByOwningShard) {
+ sessionRecordsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionRecordsGroupedByShard;
+}
+
Status SessionsCollectionSharded::setupSessionsCollection(OperationContext* opCtx) {
return checkSessionsCollectionExists(opCtx);
}
@@ -100,7 +156,9 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
return response.toStatus();
};
- return doRefresh(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doRefresh(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionRecordsByOwningShard(opCtx, sessions),
+ send);
}
Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
@@ -117,7 +175,9 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
return response.toStatus();
};
- return doRemove(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doRemove(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
@@ -163,7 +223,9 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
return replyBuilder.releaseBody();
};
- return doFetch(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doFindRemoved(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h
index bd377d369bd..95aba591cb7 100644
--- a/src/mongo/db/sessions_collection_sharded.h
+++ b/src/mongo/db/sessions_collection_sharded.h
@@ -73,6 +73,20 @@ public:
protected:
Status _checkCacheForSessionsCollection(OperationContext* opCtx);
+
+ /**
+ * These two methods use the sharding routing metadata to do a best effort attempt at grouping
+ * the specified set of sessions by the shards, which have the records for these sessions. This
+ * is done as an attempt to avoid broadcast queries.
+ *
+ * The reason it is 'best effort' is because it makes no attempt at checking whether the routing
+ * table is up-to-date and just picks up whatever was most recently fetched from the config
+ * server, which could be stale.
+ */
+ std::vector<LogicalSessionId> _groupSessionIdsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions);
+ std::vector<LogicalSessionRecord> _groupSessionRecordsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionRecordSet& sessions);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp
index 32e93491fbf..25ae4db3dfa 100644
--- a/src/mongo/db/sessions_collection_standalone.cpp
+++ b/src/mongo/db/sessions_collection_standalone.cpp
@@ -100,7 +100,7 @@ Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions) {
DBDirectClient client(opCtx);
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ std::vector(sessions.begin(), sessions.end()),
makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client));
}
@@ -108,16 +108,16 @@ Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx,
const LogicalSessionIdSet& sessions) {
DBDirectClient client(opCtx);
return doRemove(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ std::vector(sessions.begin(), sessions.end()),
makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client));
}
StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
DBDirectClient client(opCtx);
- return doFetch(NamespaceString::kLogicalSessionsNamespace,
- sessions,
- makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client));
+ return doFindRemoved(NamespaceString::kLogicalSessionsNamespace,
+ std::vector(sessions.begin(), sessions.end()),
+ makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client));
}
} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
deleted file mode 100644
index e1ac967ef59..00000000000
--- a/src/mongo/db/transaction_reaper.cpp
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/transaction_reaper.h"
-
-#include "mongo/bson/bsonmisc.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/ops/write_ops.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/session_txn_record_gen.h"
-#include "mongo/db/sessions_collection.h"
-#include "mongo/db/transaction_reaper_gen.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/shard.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-namespace {
-
-const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
-const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
-const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
-
-/**
- * Makes the query we'll use to scan the transactions table.
- *
- * Scans for records older than the minimum lifetime and uses a sort to walk the index and attempt
- * to pull records likely to be on the same chunks (because they sort near each other).
- */
-Query makeQuery(Date_t now) {
- const Date_t possiblyExpired(now - Minutes(gTransactionRecordMinimumLifetimeMinutes));
- Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired));
- query.sort(kSortById);
- return query;
-}
-
-/**
- * Our impl is templatized on a type which handles the lsids we see. It provides the top level
- * scaffolding for figuring out if we're the primary node responsible for the transaction table and
- * invoking the handler.
- *
- * The handler here will see all of the possibly expired txn ids in the transaction table and will
- * have a lifetime associated with a single call to reap.
- */
-template <typename Handler>
-class TransactionReaperImpl final : public TransactionReaper {
-public:
- TransactionReaperImpl(std::shared_ptr<SessionsCollection> collection)
- : _collection(std::move(collection)) {}
-
- int reap(OperationContext* opCtx) override {
- Handler handler(opCtx, *_collection);
- if (!handler.initialize()) {
- return 0;
- }
-
- // Make a best-effort attempt to only reap when the node is running as a primary
- const auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
- if (!coord->canAcceptWritesForDatabase_UNSAFE(
- opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) {
- return 0;
- }
-
- DBDirectClient client(opCtx);
-
- // Fill all stale config.transactions entries
- auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now());
- auto cursor = client.query(
- NamespaceString::kSessionTransactionsTableNamespace, query, 0, 0, &kIdProjection);
-
- while (cursor->more()) {
- auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
- "TransactionSession"_sd, cursor->next());
-
- handler.handleLsid(transactionSession.get_id());
- }
-
- // Before the handler goes out of scope, flush its last batch to disk and collect stats.
- return handler.finalize();
- }
-
-private:
- std::shared_ptr<SessionsCollection> _collection;
-};
-
-/**
- * Removes the specified set of session ids from the persistent sessions collection and returns the
- * number of sessions actually removed.
- */
-int removeSessionsTransactionRecords(OperationContext* opCtx,
- SessionsCollection& sessionsCollection,
- const LogicalSessionIdSet& sessionIdsToRemove) {
- if (sessionIdsToRemove.empty()) {
- return 0;
- }
-
- // From the passed-in sessions, find the ones which are actually expired/removed
- auto expiredSessionIds =
- uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove));
-
- DBDirectClient client(opCtx);
- int numDeleted = 0;
-
- for (auto it = expiredSessionIds.begin(); it != expiredSessionIds.end();) {
- write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace);
- deleteOp.setWriteCommandBase([] {
- write_ops::WriteCommandBase base;
- base.setOrdered(false);
- return base;
- }());
- deleteOp.setDeletes([&] {
- // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object
- // size limit
- const int kMaxBatchSize = 10'000;
- std::vector<write_ops::DeleteOpEntry> entries;
- for (; it != expiredSessionIds.end() && entries.size() < kMaxBatchSize; ++it) {
- entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << it->toBSON()),
- false /* multi = false */);
- }
- return entries;
- }());
-
- BSONObj result;
- client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
- deleteOp.toBSON({}),
- result);
-
- BatchedCommandResponse response;
- std::string errmsg;
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Failed to parse response " << result,
- response.parseBSON(result, &errmsg));
- uassertStatusOK(response.getTopLevelStatus());
-
- numDeleted += response.getN();
- }
-
- return numDeleted;
-}
-
-/**
- * The repl impl is simple, just pass along to the sessions collection for checking ids locally
- */
-class ReplHandler {
-public:
- ReplHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
- : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
-
- bool initialize() {
- return true;
- }
-
- void handleLsid(const LogicalSessionId& lsid) {
- _batch.insert(lsid);
-
- if (_batch.size() >= write_ops::kMaxWriteBatchSize) {
- _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch);
- _batch.clear();
- }
- }
-
- int finalize() {
- invariant(!_finalized);
- _finalized = true;
-
- _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch);
- return _numReaped;
- }
-
-private:
- OperationContext* const _opCtx;
- SessionsCollection& _sessionsCollection;
-
- LogicalSessionIdSet _batch;
-
- int _numReaped{0};
-
- bool _finalized{false};
-};
-
-/**
- * The sharding impl is a little fancier. Try to bucket by shard id, to avoid doing repeated small
- * scans.
- */
-class ShardedHandler {
-public:
- ShardedHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
- : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
-
- // Returns false if the sessions collection is not set up.
- bool initialize() {
- auto routingInfo =
- uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo(
- _opCtx, NamespaceString::kLogicalSessionsNamespace));
- _cm = routingInfo.cm();
- return !!_cm;
- }
-
- void handleLsid(const LogicalSessionId& lsid) {
- invariant(_cm);
-
- // This code attempts to group requests to 'removeSessionsTransactionRecords' to contain
- // batches of lsids, which only fall on the same shard, so that the query to check whether
- // they are alive doesn't need to do cross-shard scatter/gather queries
- const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON());
- const auto& shardId = chunk.getShardId();
-
- auto& lsids = _shards[shardId];
- lsids.insert(lsid);
-
- if (lsids.size() >= write_ops::kMaxWriteBatchSize) {
- _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, lsids);
- _shards.erase(shardId);
- }
- }
-
- int finalize() {
- invariant(!_finalized);
- _finalized = true;
-
- for (const auto& pair : _shards) {
- _numReaped +=
- removeSessionsTransactionRecords(_opCtx, _sessionsCollection, pair.second);
- }
-
- return _numReaped;
- }
-
-private:
- OperationContext* const _opCtx;
- SessionsCollection& _sessionsCollection;
-
- std::shared_ptr<ChunkManager> _cm;
-
- stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards;
- int _numReaped{0};
-
- bool _finalized{false};
-};
-
-} // namespace
-
-std::unique_ptr<TransactionReaper> TransactionReaper::make(
- Type type, std::shared_ptr<SessionsCollection> collection) {
- switch (type) {
- case Type::kReplicaSet:
- return stdx::make_unique<TransactionReaperImpl<ReplHandler>>(std::move(collection));
- case Type::kSharded:
- return stdx::make_unique<TransactionReaperImpl<ShardedHandler>>(std::move(collection));
- }
- MONGO_UNREACHABLE;
-}
-
-TransactionReaper::~TransactionReaper() = default;
-
-} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.h b/src/mongo/db/transaction_reaper.h
deleted file mode 100644
index 4c3c252e634..00000000000
--- a/src/mongo/db/transaction_reaper.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <memory>
-
-namespace mongo {
-
-class ServiceContext;
-class SessionsCollection;
-class OperationContext;
-
-/**
- * TransactionReaper is responsible for scanning the transaction table, checking if sessions are
- * still alive and deleting the transaction records if their sessions have expired.
- */
-class TransactionReaper {
-public:
- enum class Type {
- kReplicaSet,
- kSharded,
- };
-
- virtual ~TransactionReaper() = 0;
-
- virtual int reap(OperationContext* OperationContext) = 0;
-
- /**
- * The implementation of the sessions collections is different in replica sets versus sharded
- * clusters, so we have a factory to pick the right impl.
- */
- static std::unique_ptr<TransactionReaper> make(Type type,
- std::shared_ptr<SessionsCollection> collection);
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.idl b/src/mongo/db/transaction_reaper.idl
deleted file mode 100644
index 7146d869a3f..00000000000
--- a/src/mongo/db/transaction_reaper.idl
+++ /dev/null
@@ -1,52 +0,0 @@
-# Copyright (C) 2019-present MongoDB, Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the Server Side Public License, version 1,
-# as published by MongoDB, Inc.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# Server Side Public License for more details.
-#
-# You should have received a copy of the Server Side Public License
-# along with this program. If not, see
-# <http://www.mongodb.com/licensing/server-side-public-license>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the Server Side Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-#
-
-global:
- cpp_namespace: "mongo"
-
-server_parameters:
- TransactionRecordMinimumLifetimeMinutes:
- # The minimum lifetime for a transaction record is how long it has to have lived on the server
- # before we'll consider it for cleanup. This is effectively the window for how long it is
- # permissible for a mongos to hang before we're willing to accept a failure of the retryable write
- # subsystem.
- #
- # Specifically, we imagine that a client connects to one mongos on a session and performs a
- # retryable write. That mongos hangs. Then the client connects to a new mongos on the same
- # session and successfully executes its write. After a day passes, the session will time out,
- # cleaning up the retryable write. Then the original mongos wakes up, vivifies the session and
- # executes the write (because all records of the session + transaction have been deleted).
- #
- # So the write is performed twice, which is unavoidable without losing session vivification and/or
- # requiring synchronized clocks all the way out to the client. In lieu of that we provide a weaker
- # guarantee after the minimum transaction lifetime.
- description: 'The minimum lifetime for a transaction record.'
- set_at: startup
- cpp_vartype: int
- cpp_varname: gTransactionRecordMinimumLifetimeMinutes
- default: 30
diff --git a/src/mongo/db/transaction_reaper_d.cpp b/src/mongo/db/transaction_reaper_d.cpp
new file mode 100644
index 00000000000..1370c60c4f6
--- /dev/null
+++ b/src/mongo/db/transaction_reaper_d.cpp
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/transaction_reaper_d.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/sessions_collection.h"
+#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace {
+
+const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
+const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
+const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
+
+/**
+ * Removes the specified set of session ids from the persistent sessions collection and returns the
+ * number of sessions actually removed.
+ */
+int removeSessionsTransactionRecords(OperationContext* opCtx,
+ SessionsCollection& sessionsCollection,
+ const LogicalSessionIdSet& sessionIdsToRemove) {
+ if (sessionIdsToRemove.empty()) {
+ return 0;
+ }
+
+ // From the passed-in sessions, find the ones which are actually expired/removed
+ auto expiredSessionIds =
+ uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove));
+
+ write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace);
+ deleteOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase base;
+ base.setOrdered(false);
+ return base;
+ }());
+ deleteOp.setDeletes([&] {
+ std::vector<write_ops::DeleteOpEntry> entries;
+ for (auto it = expiredSessionIds.begin(); it != expiredSessionIds.end(); ++it) {
+ entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << it->toBSON()),
+ false /* multi = false */);
+ }
+ return entries;
+ }());
+
+ BSONObj result;
+
+ DBDirectClient client(opCtx);
+ client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
+ deleteOp.toBSON({}),
+ result);
+
+ BatchedCommandResponse response;
+ std::string errmsg;
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse response " << result,
+ response.parseBSON(result, &errmsg));
+ uassertStatusOK(response.getTopLevelStatus());
+
+ return response.getN();
+}
+
+} // namespace
+
+int TransactionReaperD::reapSessionsOlderThan(OperationContext* opCtx,
+ SessionsCollection& sessionsCollection,
+ Date_t possiblyExpired) {
+ // Scan for records older than the minimum lifetime and uses a sort to walk the '_id' index
+ DBDirectClient client(opCtx);
+ auto cursor =
+ client.query(NamespaceString::kSessionTransactionsTableNamespace,
+ Query(BSON(kLastWriteDateFieldName << LT << possiblyExpired)).sort(kSortById),
+ 0,
+ 0,
+ &kIdProjection);
+
+ // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object
+ // size limit
+ LogicalSessionIdSet lsids;
+ const int kMaxBatchSize = 10'000;
+
+ int numReaped = 0;
+ while (cursor->more()) {
+ auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
+ "TransactionSession"_sd, cursor->next());
+
+ lsids.insert(transactionSession.get_id());
+ if (lsids.size() > kMaxBatchSize) {
+ numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids);
+ lsids.clear();
+ }
+ }
+
+ numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids);
+
+ return numReaped;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_factory_mongos.h b/src/mongo/db/transaction_reaper_d.h
index 739c633631d..057583b0ec9 100644
--- a/src/mongo/db/logical_session_cache_factory_mongos.h
+++ b/src/mongo/db/transaction_reaper_d.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2019-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -29,13 +29,18 @@
#pragma once
-#include <memory>
-
-#include "mongo/db/logical_session_cache.h"
-#include "mongo/db/service_liaison.h"
+#include "mongo/util/time_support.h"
namespace mongo {
-std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS();
+class SessionsCollection;
+class OperationContext;
+
+class TransactionReaperD {
+public:
+ static int reapSessionsOlderThan(OperationContext* OperationContext,
+ SessionsCollection& sessionsCollection,
+ Date_t possiblyExpired);
+};
} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper_test.cpp b/src/mongo/db/transaction_reaper_d_test.cpp
index 546ba58568a..165ed3597a0 100644
--- a/src/mongo/db/transaction_reaper_test.cpp
+++ b/src/mongo/db/transaction_reaper_d_test.cpp
@@ -34,7 +34,7 @@
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/sessions_collection_mock.h"
-#include "mongo/db/transaction_reaper.h"
+#include "mongo/db/transaction_reaper_d.h"
#include "mongo/util/clock_source_mock.h"
namespace mongo {
@@ -61,15 +61,12 @@ protected:
std::shared_ptr<MockSessionsCollectionImpl> _collectionMock{
std::make_shared<MockSessionsCollectionImpl>()};
- std::unique_ptr<TransactionReaper> _reaper{
- TransactionReaper::make(TransactionReaper::Type::kReplicaSet,
- std::make_shared<MockSessionsCollection>(_collectionMock))};
+ std::shared_ptr<SessionsCollection> _collection{
+ std::make_shared<MockSessionsCollection>(_collectionMock)};
};
TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) {
- _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now()));
- _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now()));
-
+ // Create some "old" sessions
DBDirectClient client(_opCtx);
SessionTxnRecord txn1(
makeLogicalSessionIdForTest(), 100, repl::OpTime(Timestamp(100), 1), clock()->now());
@@ -79,8 +76,15 @@ TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
std::vector{txn1.toBSON(), txn2.toBSON()});
+ // Add some "new" sessions to ensure they don't get reaped
clock()->advance(Minutes{31});
- ASSERT_EQ(2, _reaper->reap(_opCtx));
+ _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now()));
+ _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now()));
+
+
+ ASSERT_EQ(2,
+ TransactionReaperD::reapSessionsOlderThan(
+ _opCtx, *_collection, clock()->now() - Minutes{30}));
}
} // namespace
diff --git a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
index ae9f55f6ae8..fa606b87c3f 100644
--- a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
+++ b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
@@ -31,8 +31,6 @@
#include "mongo/platform/basic.h"
-#include <memory>
-
#include "mongo/embedded/logical_session_cache_factory_embedded.h"
#include "mongo/db/logical_session_cache_impl.h"
@@ -46,11 +44,14 @@ namespace mongo {
std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheEmbedded() {
auto liaison = std::make_unique<ServiceLiaisonMongod>();
- // Set up the logical session cache
auto sessionsColl = std::make_shared<SessionsCollectionStandalone>();
return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), nullptr /* reaper */);
+ std::move(liaison),
+ std::move(sessionsColl),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ });
}
} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 23a1e9a5019..88db8654426 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -106,7 +106,6 @@ env.Library(
'client/sharding_network_connection_hook.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/db/logical_session_cache_factory_mongos',
'$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_impl',
'$BUILD_DIR/mongo/s/catalog/replset_dist_lock_manager',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 29c490ba8b0..6580faeb0c5 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -55,13 +55,15 @@
#include "mongo/db/lasterror.h"
#include "mongo/db/log_process_details.h"
#include "mongo/db/logical_clock.h"
-#include "mongo/db/logical_session_cache_factory_mongos.h"
+#include "mongo/db/logical_session_cache_impl.h"
#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/service_liaison_mongos.h"
#include "mongo/db/session_killer.h"
+#include "mongo/db/sessions_collection_sharded.h"
#include "mongo/db/startup_warnings_common.h"
#include "mongo/db/wire_version.h"
#include "mongo/executor/task_executor_pool.h"
@@ -84,7 +86,6 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/service_entry_point_mongos.h"
#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
-#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
#include "mongo/s/sharding_initialization.h"
#include "mongo/s/sharding_uptime_reporter.h"
#include "mongo/s/version_mongos.h"
@@ -187,6 +188,9 @@ void cleanupTask(ServiceContext* serviceContext) {
Client::initThread(getThreadName());
Client& client = cc();
+ // Join the logical session cache before the transport layer
+ LogicalSessionCache::get(serviceContext)->joinOnShutDown();
+
// Shutdown the TransportLayer so that new connections aren't accepted
if (auto tl = serviceContext->getTransportLayer()) {
log(LogComponent::kNetwork) << "shutdown: going to close all sockets...";
@@ -496,8 +500,13 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
SessionKiller::set(serviceContext,
std::make_shared<SessionKiller>(serviceContext, killSessionsRemote));
- // Set up the logical session cache
- LogicalSessionCache::set(serviceContext, makeLogicalSessionCacheS());
+ LogicalSessionCache::set(serviceContext,
+ stdx::make_unique<LogicalSessionCacheImpl>(
+ stdx::make_unique<ServiceLiaisonMongos>(),
+ stdx::make_unique<SessionsCollectionSharded>(),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ }));
status = serviceContext->getServiceExecutor()->start();
if (!status.isOK()) {