summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/SConscript24
-rw-r--r--src/mongo/db/SConscript27
-rw-r--r--src/mongo/db/commands/SConscript3
-rw-r--r--src/mongo/db/commands/end_sessions_command.cpp12
-rw-r--r--src/mongo/db/db.cpp14
-rw-r--r--src/mongo/db/logical_session_cache.h7
-rw-r--r--src/mongo/db/logical_session_cache.idl (renamed from src/mongo/db/logical_session_cache_impl.idl)23
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongod.cpp24
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongos.cpp51
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp110
-rw-r--r--src/mongo/db/logical_session_cache_impl.h37
-rw-r--r--src/mongo/db/logical_session_cache_noop.h2
-rw-r--r--src/mongo/db/logical_session_cache_test.cpp6
-rw-r--r--src/mongo/db/logical_session_id_test.cpp6
-rw-r--r--src/mongo/db/s/SConscript37
-rw-r--r--src/mongo/db/sessions_collection.cpp11
-rw-r--r--src/mongo/db/sessions_collection.h21
-rw-r--r--src/mongo/db/sessions_collection_rs.cpp45
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp68
-rw-r--r--src/mongo/db/sessions_collection_sharded.h14
-rw-r--r--src/mongo/db/sessions_collection_standalone.cpp10
-rw-r--r--src/mongo/db/transaction_reaper.cpp289
-rw-r--r--src/mongo/db/transaction_reaper.h63
-rw-r--r--src/mongo/db/transaction_reaper.idl52
-rw-r--r--src/mongo/db/transaction_reaper_d.cpp133
-rw-r--r--src/mongo/db/transaction_reaper_d.h (renamed from src/mongo/db/logical_session_cache_factory_mongos.h)17
-rw-r--r--src/mongo/db/transaction_reaper_d_test.cpp (renamed from src/mongo/db/transaction_reaper_test.cpp)20
-rw-r--r--src/mongo/embedded/logical_session_cache_factory_embedded.cpp9
-rw-r--r--src/mongo/s/SConscript1
-rw-r--r--src/mongo/s/server.cpp17
30 files changed, 458 insertions, 695 deletions
diff --git a/src/mongo/SConscript b/src/mongo/SConscript
index a070a881f0a..871429b589d 100644
--- a/src/mongo/SConscript
+++ b/src/mongo/SConscript
@@ -328,14 +328,14 @@ mongod = env.Program(
'db/catalog/index_key_validate',
'db/cloner',
'db/collection_index_usage_tracker',
- 'db/commands/mongod',
'db/commands/mongod_fcv',
+ 'db/commands/mongod',
'db/commands/server_status_servers',
'db/common',
'db/concurrency/lock_manager',
'db/concurrency/write_conflict_exception',
- 'db/curop',
'db/curop_metrics',
+ 'db/curop',
'db/db_raii',
'db/dbdirectclient',
'db/dbhelpers',
@@ -343,9 +343,9 @@ mongod = env.Program(
'db/free_mon/free_mon_mongod',
'db/ftdc/ftdc_mongod',
'db/fts/ftsmongod',
+ 'db/index_builds_coordinator_mongod',
'db/index/index_access_method',
'db/index/index_descriptor',
- 'db/index_builds_coordinator_mongod',
'db/initialize_snmp',
'db/introspect',
'db/keys_collection_client_direct',
@@ -375,13 +375,13 @@ mongod = env.Program(
'db/repl/rs_rollback',
'db/repl/rslog',
'db/repl/serveronly_repl',
- 'db/repl/storage_interface',
'db/repl/storage_interface_impl',
+ 'db/repl/storage_interface',
'db/repl/topology_coordinator',
'db/rw_concern_d',
'db/s/balancer',
- 'db/s/commands_db_s',
'db/s/op_observer_sharding_impl',
+ 'db/s/sharding_commands_d',
'db/s/sharding_runtime_d',
'db/service_context_d',
'db/startup_warnings_mongod',
@@ -392,8 +392,8 @@ mongod = env.Program(
'db/storage/biggie/storage_biggie',
'db/storage/devnull/storage_devnull',
'db/storage/ephemeral_for_test/storage_ephemeral_for_test',
- 'db/storage/flow_control',
'db/storage/flow_control_parameters',
+ 'db/storage/flow_control',
'db/storage/storage_engine_lock_file',
'db/storage/storage_engine_metadata',
'db/storage/storage_init_d',
@@ -403,8 +403,8 @@ mongod = env.Program(
'db/traffic_recorder',
'db/ttl_collection_cache',
'db/ttl_d',
- 'db/update/update_driver',
'db/update_index_data',
+ 'db/update/update_driver',
'db/views/views_mongod',
'db/windows_options' if env.TargetOSIs('windows') else [],
'executor/network_interface_factory',
@@ -490,15 +490,19 @@ mongos = env.Program(
LIBDEPS=[
'db/audit',
'db/auth/authmongos',
- 'db/commands/server_status',
'db/commands/server_status_core',
'db/commands/server_status_servers',
+ 'db/commands/server_status',
'db/curop',
'db/ftdc/ftdc_mongos',
+ 'db/logical_session_cache_impl',
+ 'db/logical_session_cache',
'db/logical_time_metadata_hook',
'db/mongodandmongos',
- 'db/server_options',
'db/server_options_base',
+ 'db/server_options',
+ 'db/service_liaison_mongos',
+ 'db/sessions_collection_sharded',
'db/startup_warnings_common',
'db/stats/counters',
'db/windows_options' if env.TargetOSIs('windows') else [],
@@ -507,10 +511,10 @@ mongos = env.Program(
's/committed_optime_metadata_hook',
's/coreshard',
's/is_mongos',
+ 's/query/cluster_cursor_cleanup_job',
's/sharding_egress_metadata_hook_for_mongos',
's/sharding_initialization',
's/sharding_router_api',
- 's/query/cluster_cursor_cleanup_job',
'transport/message_compressor_options_server',
'transport/service_entry_point',
'transport/transport_layer_manager',
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 9f4088dba01..93928f28e42 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1501,6 +1501,7 @@ env.Library(
source=[
'logical_session_cache.cpp',
env.Idlc('logical_session_cache_stats.idl')[0],
+ env.Idlc('logical_session_cache.idl')[0],
],
LIBDEPS=[
'logical_session_id',
@@ -1517,8 +1518,6 @@ env.Library(
'initialize_operation_session_info.cpp',
'logical_session_cache_impl.cpp',
'logical_session_server_status_section.cpp',
- env.Idlc('logical_session_cache_impl.idl')[0],
- env.Idlc('commands/end_sessions.idl')[0],
],
LIBDEPS=[
'logical_session_cache',
@@ -1546,10 +1545,9 @@ env.Library(
)
env.Library(
- target='transaction_reaper',
+ target='transaction_reaper_d',
source=[
- 'transaction_reaper.cpp',
- env.Idlc('transaction_reaper.idl')[0],
+ 'transaction_reaper_d.cpp',
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
@@ -1567,7 +1565,7 @@ envWithAsio.CppUnitTest(
target='logical_session_cache_test',
source=[
'logical_session_cache_test.cpp',
- 'transaction_reaper_test.cpp',
+ 'transaction_reaper_d_test.cpp',
],
LIBDEPS=[
'$BUILD_DIR/mongo/executor/async_timer_mock',
@@ -1584,7 +1582,7 @@ envWithAsio.CppUnitTest(
'service_context_d_test_fixture',
'service_liaison_mock',
'sessions_collection_mock',
- 'transaction_reaper',
+ 'transaction_reaper_d',
'transaction',
],
)
@@ -1604,20 +1602,7 @@ envWithAsio.Library(
'sessions_collection_standalone',
],
LIBDEPS_PRIVATE=[
- 'transaction_reaper',
- ],
-)
-
-envWithAsio.Library(
- target='logical_session_cache_factory_mongos',
- source=[
- 'logical_session_cache_factory_mongos.cpp',
- ],
- LIBDEPS=[
- 'logical_session_cache',
- 'logical_session_cache_impl',
- 'service_liaison_mongos',
- 'sessions_collection_sharded',
+ 'transaction_reaper_d',
],
)
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 7f83d0d192d..8c7fc3639ad 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -102,11 +102,12 @@ env.Library(
"kill_all_sessions_command.cpp",
"kill_sessions_command.cpp",
"parameters.cpp",
- env.Idlc('parameters.idl')[0],
"refresh_logical_session_cache_now.cpp",
"refresh_sessions_command.cpp",
"rename_collection_common.cpp",
"start_session_command.cpp",
+ env.Idlc('end_sessions.idl')[0],
+ env.Idlc('parameters.idl')[0],
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/bson/mutable/mutable_bson',
diff --git a/src/mongo/db/commands/end_sessions_command.cpp b/src/mongo/db/commands/end_sessions_command.cpp
index 9822eebd60c..b8325015cf1 100644
--- a/src/mongo/db/commands/end_sessions_command.cpp
+++ b/src/mongo/db/commands/end_sessions_command.cpp
@@ -33,11 +33,13 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/end_sessions_gen.h"
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
namespace mongo {
+namespace {
class EndSessionsCommand final : public BasicCommand {
EndSessionsCommand(const EndSessionsCommand&) = delete;
@@ -73,10 +75,10 @@ public:
}
}
- virtual bool run(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
+ bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
auto lsCache = LogicalSessionCache::get(opCtx);
auto cmd = EndSessionsCmdFromClient::parse("EndSessionsCmdFromClient"_sd, cmdObj);
@@ -84,6 +86,8 @@ public:
lsCache->endSessions(makeLogicalSessionIds(cmd.getEndSessions(), opCtx));
return true;
}
+
} endSessionsCommand;
+} // namespace
} // namespace mongo
diff --git a/src/mongo/db/db.cpp b/src/mongo/db/db.cpp
index 94ec16df197..31477531f64 100644
--- a/src/mongo/db/db.cpp
+++ b/src/mongo/db/db.cpp
@@ -511,7 +511,11 @@ ExitCode _initAndListen(int listenPort) {
auto shardingInitialized = ShardingInitializationMongoD::get(startupOpCtx.get())
->initializeShardingAwarenessIfNeeded(startupOpCtx.get());
if (shardingInitialized) {
- waitForShardRegistryReload(startupOpCtx.get()).transitional_ignore();
+ auto status = waitForShardRegistryReload(startupOpCtx.get());
+ if (!status.isOK()) {
+ LOG(0) << "Failed to load the shard registry as part of startup"
+ << causedBy(redact(status));
+ }
}
auto storageEngine = serviceContext->getStorageEngine();
@@ -622,8 +626,7 @@ ExitCode _initAndListen(int listenPort) {
kind = LogicalSessionCacheServer::kReplicaSet;
}
- auto sessionCache = makeLogicalSessionCacheD(kind);
- LogicalSessionCache::set(serviceContext, std::move(sessionCache));
+ LogicalSessionCache::set(serviceContext, makeLogicalSessionCacheD(kind));
// MessageServer::run will return when exit code closes its socket and we don't need the
// operation context anymore
@@ -903,6 +906,11 @@ void shutdownTask(const ShutdownTaskArgs& shutdownArgs) {
balancer->waitForBalancerToStop();
}
+ // Join the logical session cache before the transport layer.
+ if (auto lsc = LogicalSessionCache::get(serviceContext)) {
+ lsc->joinOnShutDown();
+ }
+
// Shutdown the TransportLayer so that new connections aren't accepted
if (auto tl = serviceContext->getTransportLayer()) {
log(LogComponent::kNetwork) << "shutdown: going to close listening sockets...";
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index 703ec97a337..e035ee13d7f 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -32,7 +32,7 @@
#include <boost/optional.hpp>
#include "mongo/base/status.h"
-#include "mongo/db/commands/end_sessions_gen.h"
+#include "mongo/db/logical_session_cache_gen.h"
#include "mongo/db/logical_session_cache_stats_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/refresh_sessions_gen.h"
@@ -58,6 +58,11 @@ public:
virtual ~LogicalSessionCache() = 0;
/**
+ * Invoked on service shutdown time in order to join the cache's refresher and reaper tasks.
+ */
+ virtual void joinOnShutDown() = 0;
+
+ /**
* If the cache contains a record for this LogicalSessionId, promotes that lsid
* to be the most recently used and updates its lastUse date to be the current
* time. Returns an error if the session was not found.
diff --git a/src/mongo/db/logical_session_cache_impl.idl b/src/mongo/db/logical_session_cache.idl
index 71cd63e1ed3..3294df14378 100644
--- a/src/mongo/db/logical_session_cache_impl.idl
+++ b/src/mongo/db/logical_session_cache.idl
@@ -1,4 +1,4 @@
-# Copyright (C) 2018-present MongoDB, Inc.
+# Copyright (C) 2019-present MongoDB, Inc.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the Server Side Public License, version 1,
@@ -52,3 +52,24 @@ server_parameters:
cpp_vartype: bool
cpp_varname: disableLogicalSessionCacheRefresh
default: false
+
+ TransactionRecordMinimumLifetimeMinutes:
+ # The minimum lifetime for a transaction record is how long it has to have lived on the server
+ # before we'll consider it for cleanup. This is effectively the window for how long it is
+ # permissible for a mongos to hang before we're willing to accept a failure of the retryable
+ # writes subsystem.
+ #
+ # Specifically, we imagine that a client connects to one mongos on a session and performs a
+ # retryable write after which that mongos hangs. Then the client connects to a new mongos on the
+ # same session and successfully executes its write. After a day passes, the session will time
+ # out, cleaning up the retryable write. Then the original mongos wakes up, vivifies the session
+ # and executes the write (because all records of the session + transaction have been deleted).
+ #
+ # So the write is performed twice, which is unavoidable without losing session vivification
+ # and/or requiring synchronized clocks all the way out to the client. In lieu of that we
+ # provide a weaker guarantee after the minimum transaction lifetime.
+ description: The minimum lifetime for a transaction record.
+ set_at: startup
+ cpp_vartype: int
+ cpp_varname: gTransactionRecordMinimumLifetimeMinutes
+ default: 30
diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp
index 6610702c1ca..2d9870e258a 100644
--- a/src/mongo/db/logical_session_cache_factory_mongod.cpp
+++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp
@@ -31,8 +31,6 @@
#include "mongo/platform/basic.h"
-#include <memory>
-
#include "mongo/db/logical_session_cache_factory_mongod.h"
#include "mongo/db/logical_session_cache_impl.h"
@@ -41,14 +39,14 @@
#include "mongo/db/sessions_collection_rs.h"
#include "mongo/db/sessions_collection_sharded.h"
#include "mongo/db/sessions_collection_standalone.h"
-#include "mongo/db/transaction_reaper.h"
+#include "mongo/db/transaction_reaper_d.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
namespace mongo {
std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCacheServer state) {
- auto liaison = stdx::make_unique<ServiceLiaisonMongod>();
+ auto liaison = std::make_unique<ServiceLiaisonMongod>();
auto sessionsColl = [&]() -> std::shared_ptr<SessionsCollection> {
switch (state) {
@@ -65,22 +63,8 @@ std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheD(LogicalSessionCach
MONGO_UNREACHABLE;
}();
- auto reaper = [&]() -> std::shared_ptr<TransactionReaper> {
- switch (state) {
- case LogicalSessionCacheServer::kSharded:
- return TransactionReaper::make(TransactionReaper::Type::kSharded, sessionsColl);
- case LogicalSessionCacheServer::kReplicaSet:
- return TransactionReaper::make(TransactionReaper::Type::kReplicaSet, sessionsColl);
- case LogicalSessionCacheServer::kConfigServer:
- case LogicalSessionCacheServer::kStandalone:
- return nullptr;
- }
-
- MONGO_UNREACHABLE;
- }();
-
- return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), std::move(reaper));
+ return std::make_unique<LogicalSessionCacheImpl>(
+ std::move(liaison), std::move(sessionsColl), TransactionReaperD::reapSessionsOlderThan);
}
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_factory_mongos.cpp b/src/mongo/db/logical_session_cache_factory_mongos.cpp
deleted file mode 100644
index 2f8942c8d85..00000000000
--- a/src/mongo/db/logical_session_cache_factory_mongos.cpp
+++ /dev/null
@@ -1,51 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include <memory>
-
-#include "mongo/db/logical_session_cache_factory_mongos.h"
-
-#include "mongo/db/logical_session_cache_impl.h"
-#include "mongo/db/service_liaison_mongos.h"
-#include "mongo/db/sessions_collection_sharded.h"
-#include "mongo/stdx/memory.h"
-
-namespace mongo {
-
-std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS() {
- auto liaison = stdx::make_unique<ServiceLiaisonMongos>();
- auto sessionsColl = stdx::make_unique<SessionsCollectionSharded>();
-
- return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), nullptr /* reaper */);
-}
-
-} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index 24d278c9150..2095595eb5f 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/logical_session_cache_impl.h"
-#include "mongo/db/logical_session_cache_impl_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
@@ -58,13 +57,12 @@ void clearShardingOperationFailedStatus(OperationContext* opCtx) {
} // namespace
-LogicalSessionCacheImpl::LogicalSessionCacheImpl(
- std::unique_ptr<ServiceLiaison> service,
- std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper)
+LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
+ std::shared_ptr<SessionsCollection> collection,
+ ReapSessionsOlderThanFn reapSessionsOlderThanFn)
: _service(std::move(service)),
_sessionsColl(std::move(collection)),
- _transactionReaper(std::move(transactionReaper)) {
+ _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) {
_stats.setLastSessionsCollectionJobTimestamp(now());
_stats.setLastTransactionReaperJobTimestamp(now());
@@ -73,26 +71,22 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
[this](Client* client) { _periodicRefresh(client); },
Milliseconds(logicalSessionRefreshMillis)});
- if (_transactionReaper) {
- _service->scheduleJob({"LogicalSessionCacheReap",
- [this](Client* client) { _periodicReap(client); },
- Milliseconds(logicalSessionRefreshMillis)});
- }
+ _service->scheduleJob({"LogicalSessionCacheReap",
+ [this](Client* client) { _periodicReap(client); },
+ Milliseconds(logicalSessionRefreshMillis)});
}
}
LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
- try {
- _service->join();
- } catch (...) {
- // If we failed to join we might still be running a background thread, log but swallow the
- // error since there is no good way to recover
- severe() << "Failed to join background service thread";
- }
+ joinOnShutDown();
+}
+
+void LogicalSessionCacheImpl::joinOnShutDown() {
+ _service->join();
}
Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto it = _activeSessions.find(lsid);
if (it == _activeSessions.end()) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
@@ -167,7 +161,7 @@ Date_t LogicalSessionCacheImpl::now() {
}
size_t LogicalSessionCacheImpl::size() {
- stdx::lock_guard<stdx::mutex> lock(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
return _activeSessions.size();
}
@@ -189,13 +183,9 @@ void LogicalSessionCacheImpl::_periodicReap(Client* client) {
}
Status LogicalSessionCacheImpl::_reap(Client* client) {
- if (!_transactionReaper) {
- return Status::OK();
- }
-
// Take the lock to update some stats.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the last set of stats for our new run.
_stats.setLastTransactionReaperJobDurationMillis(0);
@@ -206,19 +196,19 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
_stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1);
}
- int numReaped = 0;
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
- try {
- boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
- auto* const opCtx = [&client, &uniqueCtx] {
- if (client->getOperationContext()) {
- return client->getOperationContext();
- }
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
- uniqueCtx.emplace(client->makeOperationContext());
- return uniqueCtx->get();
- }();
+ int numReaped = 0;
+ try {
ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); });
auto existsStatus = _sessionsColl->checkSessionsCollectionExists(opCtx);
@@ -236,20 +226,23 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return Status::OK();
}
- stdx::lock_guard<stdx::mutex> lk(_reaperMutex);
- numReaped = _transactionReaper->reap(opCtx);
- } catch (...) {
+ numReaped =
+ _reapSessionsOlderThanFn(opCtx,
+ *_sessionsColl,
+ opCtx->getServiceContext()->getFastClockSource()->now() -
+ Minutes(gTransactionRecordMinimumLifetimeMinutes));
+ } catch (const DBException& ex) {
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
}
- return exceptionToStatus();
+ return ex.toStatus();
}
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
_stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped);
@@ -261,7 +254,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
void LogicalSessionCacheImpl::_refresh(Client* client) {
// Stats for serverStatus:
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the refresh-related stats with the beginning of our run.
_stats.setLastSessionsCollectionJobDurationMillis(0);
@@ -276,7 +269,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// This will finish timing _refresh for our stats no matter when we return.
const auto timeRefreshJob = makeGuard([this] {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp();
_stats.setLastSessionsCollectionJobDurationMillis(millis.count());
});
@@ -308,7 +301,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
{
using std::swap;
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
swap(explicitlyEndingSessions, _endingSessions);
swap(activeSessions, _activeSessions);
}
@@ -317,7 +310,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// swapped out of LogicalSessionCache, and merges in any records that had been added since we
// swapped them out.
auto backSwap = [this](auto& member, auto& temp) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
using std::swap;
swap(member, temp);
for (const auto& it : temp) {
@@ -333,9 +326,8 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
activeSessions.erase(lsid);
}
- // refresh all recently active sessions as well as for sessions attached to running ops
-
- LogicalSessionRecordSet activeSessionRecords{};
+ // Refresh all recently active sessions as well as for sessions attached to running ops
+ LogicalSessionRecordSet activeSessionRecords;
auto runningOpSessions = _service->getActiveOpSessions();
@@ -354,7 +346,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords));
activeSessionsBackSwapper.dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size());
}
@@ -362,7 +354,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions));
explicitlyEndingBackSwaper.dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size());
}
@@ -375,7 +367,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// Exclude sessions added to _activeSessions from the openCursorSession to avoid race between
// killing cursors on the removed sessions and creating sessions.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
for (const auto& it : _activeSessions) {
auto newSessionIt = openCursorSessions.find(it.first);
@@ -405,33 +397,34 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
SessionKiller::Matcher matcher(std::move(patterns));
auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher));
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobCursorsClosed(killRes.second);
}
}
void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_endingSessions.insert(begin(sessions), end(sessions));
}
LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setActiveSessionsCount(_activeSessions.size());
return _stats;
}
Status LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeSessions.size() >= static_cast<size_t>(maxSessions)) {
return {ErrorCodes::TooManyLogicalSessions, "cannot add session into the cache"};
}
+
_activeSessions.insert(std::make_pair(record.getId(), record));
return Status::OK();
}
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
ret.reserve(_activeSessions.size());
for (const auto& id : _activeSessions) {
@@ -442,7 +435,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
const std::vector<SHA256Block>& userDigests) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
for (const auto& it : _activeSessions) {
if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) !=
@@ -455,11 +448,12 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached(
const LogicalSessionId& id) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto it = _activeSessions.find(id);
if (it == _activeSessions.end()) {
return boost::none;
}
return it->second;
}
+
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h
index b8c4001ace6..96c7f80d717 100644
--- a/src/mongo/db/logical_session_cache_impl.h
+++ b/src/mongo/db/logical_session_cache_impl.h
@@ -30,23 +30,12 @@
#pragma once
#include "mongo/db/logical_session_cache.h"
-#include "mongo/db/logical_session_cache_impl_gen.h"
-#include "mongo/db/logical_session_id.h"
-#include "mongo/db/refresh_sessions_gen.h"
#include "mongo/db/service_liaison.h"
#include "mongo/db/sessions_collection.h"
-#include "mongo/db/time_proof_service.h"
-#include "mongo/db/transaction_reaper.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/stdx/thread.h"
-#include "mongo/util/lru_cache.h"
+#include "mongo/util/functional.h"
namespace mongo {
-class Client;
-class OperationContext;
-class ServiceContext;
-
/**
* A thread-safe cache structure for logical session records.
*
@@ -63,18 +52,20 @@ class ServiceContext;
*/
class LogicalSessionCacheImpl final : public LogicalSessionCache {
public:
- /**
- * Construct a new session cache.
- */
+ using ReapSessionsOlderThanFn =
+ unique_function<int(OperationContext*, SessionsCollection&, Date_t)>;
+
LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper);
+ ReapSessionsOlderThanFn reapSessionsOlderThanFn);
LogicalSessionCacheImpl(const LogicalSessionCacheImpl&) = delete;
LogicalSessionCacheImpl& operator=(const LogicalSessionCacheImpl&) = delete;
~LogicalSessionCacheImpl();
+ void joinOnShutDown() override;
+
Status promote(LogicalSessionId lsid) override;
Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override;
@@ -126,23 +117,19 @@ private:
*/
Status _addToCache(LogicalSessionRecord record);
- // This value is only modified under the lock, and is modified
- // automatically by the background jobs.
- LogicalSessionCacheStats _stats;
-
std::unique_ptr<ServiceLiaison> _service;
std::shared_ptr<SessionsCollection> _sessionsColl;
+ ReapSessionsOlderThanFn _reapSessionsOlderThanFn;
- mutable stdx::mutex _reaperMutex;
- std::shared_ptr<TransactionReaper> _transactionReaper;
-
- mutable stdx::mutex _cacheMutex;
+ mutable stdx::mutex _mutex;
LogicalSessionIdMap<LogicalSessionRecord> _activeSessions;
LogicalSessionIdSet _endingSessions;
- Date_t lastRefreshTime;
+ Date_t _lastRefreshTime;
+
+ LogicalSessionCacheStats _stats;
};
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h
index a81f6ee8299..8b5bb312102 100644
--- a/src/mongo/db/logical_session_cache_noop.h
+++ b/src/mongo/db/logical_session_cache_noop.h
@@ -42,6 +42,8 @@ class ServiceContext;
*/
class LogicalSessionCacheNoop : public LogicalSessionCache {
public:
+ void joinOnShutDown() override {}
+
Status promote(LogicalSessionId lsid) override {
return Status::OK();
}
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp
index e7b61b664b0..a5cc6e8eed0 100644
--- a/src/mongo/db/logical_session_cache_test.cpp
+++ b/src/mongo/db/logical_session_cache_test.cpp
@@ -79,7 +79,11 @@ public:
auto mockService = stdx::make_unique<MockServiceLiaison>(_service);
auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions);
_cache = stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(mockService), std::move(mockSessions), nullptr /* reaper */);
+ std::move(mockService),
+ std::move(mockSessions),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ });
}
void waitUntilRefreshScheduled() {
diff --git a/src/mongo/db/logical_session_id_test.cpp b/src/mongo/db/logical_session_id_test.cpp
index d994458332f..160e718201d 100644
--- a/src/mongo/db/logical_session_id_test.cpp
+++ b/src/mongo/db/logical_session_id_test.cpp
@@ -92,7 +92,11 @@ public:
std::make_shared<MockSessionsCollectionImpl>());
auto localLogicalSessionCache = std::make_unique<LogicalSessionCacheImpl>(
- std::move(localServiceLiaison), std::move(localSessionsCollection), nullptr);
+ std::move(localServiceLiaison),
+ std::move(localSessionsCollection),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ });
LogicalSessionCache::set(getServiceContext(), std::move(localLogicalSessionCache));
}
diff --git a/src/mongo/db/s/SConscript b/src/mongo/db/s/SConscript
index 0fc1b35ee08..54050af3feb 100644
--- a/src/mongo/db/s/SConscript
+++ b/src/mongo/db/s/SConscript
@@ -278,7 +278,7 @@ env.Library(
)
env.Library(
- target='commands_db_s',
+ target='sharding_commands_d',
source=[
'add_shard_cmd.cpp',
'check_sharding_index_command.cpp',
@@ -372,20 +372,25 @@ env.CppUnitTest(
'migration_chunk_cloner_source_legacy_test.cpp',
'migration_destination_manager_test.cpp',
'namespace_metadata_change_notifications_test.cpp',
+ 'session_catalog_migration_destination_test.cpp',
+ 'session_catalog_migration_source_test.cpp',
'shard_metadata_util_test.cpp',
'shard_server_catalog_cache_loader_test.cpp',
'sharding_initialization_mongod_test.cpp',
'sharding_initialization_op_observer_test.cpp',
'split_vector_test.cpp',
],
- LIBDEPS=[
- 'sharding_runtime_d',
+ LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/db/auth/authmocks',
+ '$BUILD_DIR/mongo/db/logical_session_cache_impl',
+ '$BUILD_DIR/mongo/db/ops/write_ops_exec',
'$BUILD_DIR/mongo/db/query/query_request',
+ '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
'$BUILD_DIR/mongo/s/catalog/dist_lock_manager_mock',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
'$BUILD_DIR/mongo/s/shard_server_test_fixture',
+ 'sharding_runtime_d',
],
)
@@ -427,32 +432,6 @@ env.CppUnitTest(
)
env.CppUnitTest(
- target='session_catalog_migration_source_test',
- source=[
- 'session_catalog_migration_source_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/repl/mock_repl_coord_server_fixture',
- 'sharding_runtime_d',
- ]
-)
-
-env.CppUnitTest(
- target='session_catalog_migration_destination_test',
- source=[
- 'session_catalog_migration_destination_test.cpp',
- ],
- LIBDEPS=[
- '$BUILD_DIR/mongo/db/auth/authmocks',
- '$BUILD_DIR/mongo/db/ops/write_ops_exec',
- '$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_mock',
- '$BUILD_DIR/mongo/s/shard_server_test_fixture',
- 'sharding_runtime_d',
- ]
-)
-
-env.CppUnitTest(
target='sharding_catalog_manager_test',
source=[
'config/initial_split_policy_test.cpp',
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index fda30fb70f3..7ffc762201e 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -212,7 +212,7 @@ SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const N
}
Status SessionsCollection::doRefresh(const NamespaceString& ns,
- const LogicalSessionRecordSet& sessions,
+ const std::vector<LogicalSessionRecord>& sessions,
SendBatchFn send) {
auto init = [ns](BSONObjBuilder* batch) {
batch->append("update", ns.coll());
@@ -230,7 +230,7 @@ Status SessionsCollection::doRefresh(const NamespaceString& ns,
}
Status SessionsCollection::doRemove(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
+ const std::vector<LogicalSessionId>& sessions,
SendBatchFn send) {
auto init = [ns](BSONObjBuilder* batch) {
batch->append("delete", ns.coll());
@@ -245,16 +245,15 @@ Status SessionsCollection::doRemove(const NamespaceString& ns,
return runBulkCmd("deletes", init, add, send, sessions);
}
-StatusWith<LogicalSessionIdSet> SessionsCollection::doFetch(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
- FindBatchFn send) {
+StatusWith<LogicalSessionIdSet> SessionsCollection::doFindRemoved(
+ const NamespaceString& ns, const std::vector<LogicalSessionId>& sessions, FindBatchFn send) {
auto makeT = [] { return std::vector<LogicalSessionId>{}; };
auto add = [](std::vector<LogicalSessionId>& batch, const LogicalSessionId& record) {
batch.push_back(record);
};
- LogicalSessionIdSet removed = sessions;
+ LogicalSessionIdSet removed{sessions.begin(), sessions.end()};
auto wrappedSend = [&](BSONObj batch) {
auto swBatchResult = send(batch);
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index b8bdfe83ed2..64e78c8a476 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -102,31 +102,34 @@ protected:
* Makes a send function for the given client.
*/
using SendBatchFn = stdx::function<Status(BSONObj batch)>;
- SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client);
- SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client);
+ static SendBatchFn makeSendFnForCommand(const NamespaceString& ns, DBClientBase* client);
+ static SendBatchFn makeSendFnForBatchWrite(const NamespaceString& ns, DBClientBase* client);
+
using FindBatchFn = stdx::function<StatusWith<BSONObj>(BSONObj batch)>;
- FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client);
+ static FindBatchFn makeFindFnForCommand(const NamespaceString& ns, DBClientBase* client);
/**
* Formats and sends batches of refreshes for the given set of sessions.
*/
Status doRefresh(const NamespaceString& ns,
- const LogicalSessionRecordSet& sessions,
+ const std::vector<LogicalSessionRecord>& sessions,
SendBatchFn send);
/**
* Formats and sends batches of deletes for the given set of sessions.
*/
Status doRemove(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
+ const std::vector<LogicalSessionId>& sessions,
SendBatchFn send);
/**
- * Formats and sends batches of fetches for the given set of sessions.
+ * Returns those lsids from the input 'sessions' array which are not present in the sessions
+ * collection (essentially performs an inner join of 'sessions' against the sessions
+ * collection).
*/
- StatusWith<LogicalSessionIdSet> doFetch(const NamespaceString& ns,
- const LogicalSessionIdSet& sessions,
- FindBatchFn send);
+ StatusWith<LogicalSessionIdSet> doFindRemoved(const NamespaceString& ns,
+ const std::vector<LogicalSessionId>& sessions,
+ FindBatchFn send);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp
index 7abc9dfd1ae..0069d3372f3 100644
--- a/src/mongo/db/sessions_collection_rs.cpp
+++ b/src/mongo/db/sessions_collection_rs.cpp
@@ -207,18 +207,20 @@ Status SessionsCollectionRS::checkSessionsCollectionExists(OperationContext* opC
Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions) {
+ const std::vector<LogicalSessionRecord> sessionsVector(sessions.begin(), sessions.end());
+
return dispatch(NamespaceString::kLogicalSessionsNamespace,
opCtx,
[&] {
DBDirectClient client(opCtx);
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ sessionsVector,
makeSendFnForBatchWrite(
NamespaceString::kLogicalSessionsNamespace, &client));
},
[&](DBClientBase* client) {
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ sessionsVector,
makeSendFnForBatchWrite(
NamespaceString::kLogicalSessionsNamespace, client));
});
@@ -226,18 +228,20 @@ Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
const LogicalSessionIdSet& sessions) {
+ const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
+
return dispatch(NamespaceString::kLogicalSessionsNamespace,
opCtx,
[&] {
DBDirectClient client(opCtx);
return doRemove(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ sessionsVector,
makeSendFnForBatchWrite(
NamespaceString::kLogicalSessionsNamespace, &client));
},
[&](DBClientBase* client) {
return doRemove(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ sessionsVector,
makeSendFnForBatchWrite(
NamespaceString::kLogicalSessionsNamespace, client));
});
@@ -245,21 +249,24 @@ Status SessionsCollectionRS::removeRecords(OperationContext* opCtx,
StatusWith<LogicalSessionIdSet> SessionsCollectionRS::findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
- return dispatch(NamespaceString::kLogicalSessionsNamespace,
- opCtx,
- [&] {
- DBDirectClient client(opCtx);
- return doFetch(NamespaceString::kLogicalSessionsNamespace,
- sessions,
- makeFindFnForCommand(
- NamespaceString::kLogicalSessionsNamespace, &client));
- },
- [&](DBClientBase* client) {
- return doFetch(NamespaceString::kLogicalSessionsNamespace,
- sessions,
- makeFindFnForCommand(
- NamespaceString::kLogicalSessionsNamespace, client));
- });
+ const std::vector<LogicalSessionId> sessionsVector(sessions.begin(), sessions.end());
+
+ return dispatch(
+ NamespaceString::kLogicalSessionsNamespace,
+ opCtx,
+ [&] {
+ DBDirectClient client(opCtx);
+ return doFindRemoved(
+ NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client));
+ },
+ [&](DBClientBase* client) {
+ return doFindRemoved(
+ NamespaceString::kLogicalSessionsNamespace,
+ sessionsVector,
+ makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, client));
+ });
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index cf1be44431c..e65ddd0dab5 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -40,6 +40,8 @@
#include "mongo/rpc/op_msg.h"
#include "mongo/rpc/op_msg_rpc_impls.h"
#include "mongo/s/catalog_cache.h"
+#include "mongo/s/client/shard.h"
+#include "mongo/s/client/shard_registry.h"
#include "mongo/s/grid.h"
#include "mongo/s/query/cluster_find.h"
#include "mongo/s/write_ops/batch_write_exec.h"
@@ -78,6 +80,60 @@ Status SessionsCollectionSharded::_checkCacheForSessionsCollection(OperationCont
return {ErrorCodes::NamespaceNotFound, "config.system.sessions does not exist"};
}
+std::vector<LogicalSessionId> SessionsCollectionSharded::_groupSessionIdsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionId> sessionIdsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionIdsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionId> sessionIdsGroupedByShard;
+ sessionIdsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionIdsByOwningShard) {
+ sessionIdsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionIdsGroupedByShard;
+}
+
+std::vector<LogicalSessionRecord> SessionsCollectionSharded::_groupSessionRecordsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionRecordSet& sessions) {
+ auto routingInfo = uassertStatusOK(Grid::get(opCtx)->catalogCache()->getCollectionRoutingInfo(
+ opCtx, NamespaceString::kLogicalSessionsNamespace));
+ auto cm = routingInfo.cm();
+
+ uassert(ErrorCodes::NamespaceNotSharded,
+ str::stream() << "Collection " << NamespaceString::kLogicalSessionsNamespace
+ << " is not sharded",
+ cm);
+
+ std::multimap<ShardId, LogicalSessionRecord> sessionsByOwningShard;
+ for (const auto& session : sessions) {
+ sessionsByOwningShard.emplace(
+ cm->findIntersectingChunkWithSimpleCollation(session.getId().toBSON()).getShardId(),
+ session);
+ }
+
+ std::vector<LogicalSessionRecord> sessionRecordsGroupedByShard;
+ sessionRecordsGroupedByShard.reserve(sessions.size());
+ for (auto& session : sessionsByOwningShard) {
+ sessionRecordsGroupedByShard.emplace_back(std::move(session.second));
+ }
+
+ return sessionRecordsGroupedByShard;
+}
+
Status SessionsCollectionSharded::setupSessionsCollection(OperationContext* opCtx) {
return checkSessionsCollectionExists(opCtx);
}
@@ -100,7 +156,9 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
return response.toStatus();
};
- return doRefresh(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doRefresh(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionRecordsByOwningShard(opCtx, sessions),
+ send);
}
Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
@@ -117,7 +175,9 @@ Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
return response.toStatus();
};
- return doRemove(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doRemove(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
@@ -163,7 +223,9 @@ StatusWith<LogicalSessionIdSet> SessionsCollectionSharded::findRemovedSessions(
return replyBuilder.releaseBody();
};
- return doFetch(NamespaceString::kLogicalSessionsNamespace, sessions, send);
+ return doFindRemoved(NamespaceString::kLogicalSessionsNamespace,
+ _groupSessionIdsByOwningShard(opCtx, sessions),
+ send);
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h
index bd377d369bd..95aba591cb7 100644
--- a/src/mongo/db/sessions_collection_sharded.h
+++ b/src/mongo/db/sessions_collection_sharded.h
@@ -73,6 +73,20 @@ public:
protected:
Status _checkCacheForSessionsCollection(OperationContext* opCtx);
+
+ /**
+ * These two methods use the sharding routing metadata to do a best effort attempt at grouping
+ * the specified set of sessions by the shards, which have the records for these sessions. This
+ * is done as an attempt to avoid broadcast queries.
+ *
+ * The reason it is 'best effort' is because it makes no attempt at checking whether the routing
+ * table is up-to-date and just picks up whatever was most recently fetched from the config
+ * server, which could be stale.
+ */
+ std::vector<LogicalSessionId> _groupSessionIdsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions);
+ std::vector<LogicalSessionRecord> _groupSessionRecordsByOwningShard(
+ OperationContext* opCtx, const LogicalSessionRecordSet& sessions);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp
index 32e93491fbf..25ae4db3dfa 100644
--- a/src/mongo/db/sessions_collection_standalone.cpp
+++ b/src/mongo/db/sessions_collection_standalone.cpp
@@ -100,7 +100,7 @@ Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx,
const LogicalSessionRecordSet& sessions) {
DBDirectClient client(opCtx);
return doRefresh(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ std::vector(sessions.begin(), sessions.end()),
makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client));
}
@@ -108,16 +108,16 @@ Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx,
const LogicalSessionIdSet& sessions) {
DBDirectClient client(opCtx);
return doRemove(NamespaceString::kLogicalSessionsNamespace,
- sessions,
+ std::vector(sessions.begin(), sessions.end()),
makeSendFnForBatchWrite(NamespaceString::kLogicalSessionsNamespace, &client));
}
StatusWith<LogicalSessionIdSet> SessionsCollectionStandalone::findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
DBDirectClient client(opCtx);
- return doFetch(NamespaceString::kLogicalSessionsNamespace,
- sessions,
- makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client));
+ return doFindRemoved(NamespaceString::kLogicalSessionsNamespace,
+ std::vector(sessions.begin(), sessions.end()),
+ makeFindFnForCommand(NamespaceString::kLogicalSessionsNamespace, &client));
}
} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
deleted file mode 100644
index e1ac967ef59..00000000000
--- a/src/mongo/db/transaction_reaper.cpp
+++ /dev/null
@@ -1,289 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/db/transaction_reaper.h"
-
-#include "mongo/bson/bsonmisc.h"
-#include "mongo/db/dbdirectclient.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/ops/write_ops.h"
-#include "mongo/db/repl/replication_coordinator.h"
-#include "mongo/db/session_txn_record_gen.h"
-#include "mongo/db/sessions_collection.h"
-#include "mongo/db/transaction_reaper_gen.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/s/catalog_cache.h"
-#include "mongo/s/client/shard.h"
-#include "mongo/s/client/shard_registry.h"
-#include "mongo/s/grid.h"
-#include "mongo/s/write_ops/batched_command_response.h"
-#include "mongo/util/scopeguard.h"
-
-namespace mongo {
-namespace {
-
-const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
-const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
-const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
-
-/**
- * Makes the query we'll use to scan the transactions table.
- *
- * Scans for records older than the minimum lifetime and uses a sort to walk the index and attempt
- * to pull records likely to be on the same chunks (because they sort near each other).
- */
-Query makeQuery(Date_t now) {
- const Date_t possiblyExpired(now - Minutes(gTransactionRecordMinimumLifetimeMinutes));
- Query query(BSON(kLastWriteDateFieldName << LT << possiblyExpired));
- query.sort(kSortById);
- return query;
-}
-
-/**
- * Our impl is templatized on a type which handles the lsids we see. It provides the top level
- * scaffolding for figuring out if we're the primary node responsible for the transaction table and
- * invoking the handler.
- *
- * The handler here will see all of the possibly expired txn ids in the transaction table and will
- * have a lifetime associated with a single call to reap.
- */
-template <typename Handler>
-class TransactionReaperImpl final : public TransactionReaper {
-public:
- TransactionReaperImpl(std::shared_ptr<SessionsCollection> collection)
- : _collection(std::move(collection)) {}
-
- int reap(OperationContext* opCtx) override {
- Handler handler(opCtx, *_collection);
- if (!handler.initialize()) {
- return 0;
- }
-
- // Make a best-effort attempt to only reap when the node is running as a primary
- const auto coord = mongo::repl::ReplicationCoordinator::get(opCtx);
- if (!coord->canAcceptWritesForDatabase_UNSAFE(
- opCtx, NamespaceString::kSessionTransactionsTableNamespace.db())) {
- return 0;
- }
-
- DBDirectClient client(opCtx);
-
- // Fill all stale config.transactions entries
- auto query = makeQuery(opCtx->getServiceContext()->getFastClockSource()->now());
- auto cursor = client.query(
- NamespaceString::kSessionTransactionsTableNamespace, query, 0, 0, &kIdProjection);
-
- while (cursor->more()) {
- auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
- "TransactionSession"_sd, cursor->next());
-
- handler.handleLsid(transactionSession.get_id());
- }
-
- // Before the handler goes out of scope, flush its last batch to disk and collect stats.
- return handler.finalize();
- }
-
-private:
- std::shared_ptr<SessionsCollection> _collection;
-};
-
-/**
- * Removes the specified set of session ids from the persistent sessions collection and returns the
- * number of sessions actually removed.
- */
-int removeSessionsTransactionRecords(OperationContext* opCtx,
- SessionsCollection& sessionsCollection,
- const LogicalSessionIdSet& sessionIdsToRemove) {
- if (sessionIdsToRemove.empty()) {
- return 0;
- }
-
- // From the passed-in sessions, find the ones which are actually expired/removed
- auto expiredSessionIds =
- uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove));
-
- DBDirectClient client(opCtx);
- int numDeleted = 0;
-
- for (auto it = expiredSessionIds.begin(); it != expiredSessionIds.end();) {
- write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace);
- deleteOp.setWriteCommandBase([] {
- write_ops::WriteCommandBase base;
- base.setOrdered(false);
- return base;
- }());
- deleteOp.setDeletes([&] {
- // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object
- // size limit
- const int kMaxBatchSize = 10'000;
- std::vector<write_ops::DeleteOpEntry> entries;
- for (; it != expiredSessionIds.end() && entries.size() < kMaxBatchSize; ++it) {
- entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << it->toBSON()),
- false /* multi = false */);
- }
- return entries;
- }());
-
- BSONObj result;
- client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
- deleteOp.toBSON({}),
- result);
-
- BatchedCommandResponse response;
- std::string errmsg;
- uassert(ErrorCodes::FailedToParse,
- str::stream() << "Failed to parse response " << result,
- response.parseBSON(result, &errmsg));
- uassertStatusOK(response.getTopLevelStatus());
-
- numDeleted += response.getN();
- }
-
- return numDeleted;
-}
-
-/**
- * The repl impl is simple, just pass along to the sessions collection for checking ids locally
- */
-class ReplHandler {
-public:
- ReplHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
- : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
-
- bool initialize() {
- return true;
- }
-
- void handleLsid(const LogicalSessionId& lsid) {
- _batch.insert(lsid);
-
- if (_batch.size() >= write_ops::kMaxWriteBatchSize) {
- _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch);
- _batch.clear();
- }
- }
-
- int finalize() {
- invariant(!_finalized);
- _finalized = true;
-
- _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, _batch);
- return _numReaped;
- }
-
-private:
- OperationContext* const _opCtx;
- SessionsCollection& _sessionsCollection;
-
- LogicalSessionIdSet _batch;
-
- int _numReaped{0};
-
- bool _finalized{false};
-};
-
-/**
- * The sharding impl is a little fancier. Try to bucket by shard id, to avoid doing repeated small
- * scans.
- */
-class ShardedHandler {
-public:
- ShardedHandler(OperationContext* opCtx, SessionsCollection& sessionsCollection)
- : _opCtx(opCtx), _sessionsCollection(sessionsCollection) {}
-
- // Returns false if the sessions collection is not set up.
- bool initialize() {
- auto routingInfo =
- uassertStatusOK(Grid::get(_opCtx)->catalogCache()->getCollectionRoutingInfo(
- _opCtx, NamespaceString::kLogicalSessionsNamespace));
- _cm = routingInfo.cm();
- return !!_cm;
- }
-
- void handleLsid(const LogicalSessionId& lsid) {
- invariant(_cm);
-
- // This code attempts to group requests to 'removeSessionsTransactionRecords' to contain
- // batches of lsids, which only fall on the same shard, so that the query to check whether
- // they are alive doesn't need to do cross-shard scatter/gather queries
- const auto chunk = _cm->findIntersectingChunkWithSimpleCollation(lsid.toBSON());
- const auto& shardId = chunk.getShardId();
-
- auto& lsids = _shards[shardId];
- lsids.insert(lsid);
-
- if (lsids.size() >= write_ops::kMaxWriteBatchSize) {
- _numReaped += removeSessionsTransactionRecords(_opCtx, _sessionsCollection, lsids);
- _shards.erase(shardId);
- }
- }
-
- int finalize() {
- invariant(!_finalized);
- _finalized = true;
-
- for (const auto& pair : _shards) {
- _numReaped +=
- removeSessionsTransactionRecords(_opCtx, _sessionsCollection, pair.second);
- }
-
- return _numReaped;
- }
-
-private:
- OperationContext* const _opCtx;
- SessionsCollection& _sessionsCollection;
-
- std::shared_ptr<ChunkManager> _cm;
-
- stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards;
- int _numReaped{0};
-
- bool _finalized{false};
-};
-
-} // namespace
-
-std::unique_ptr<TransactionReaper> TransactionReaper::make(
- Type type, std::shared_ptr<SessionsCollection> collection) {
- switch (type) {
- case Type::kReplicaSet:
- return stdx::make_unique<TransactionReaperImpl<ReplHandler>>(std::move(collection));
- case Type::kSharded:
- return stdx::make_unique<TransactionReaperImpl<ShardedHandler>>(std::move(collection));
- }
- MONGO_UNREACHABLE;
-}
-
-TransactionReaper::~TransactionReaper() = default;
-
-} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.h b/src/mongo/db/transaction_reaper.h
deleted file mode 100644
index 4c3c252e634..00000000000
--- a/src/mongo/db/transaction_reaper.h
+++ /dev/null
@@ -1,63 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#pragma once
-
-#include <memory>
-
-namespace mongo {
-
-class ServiceContext;
-class SessionsCollection;
-class OperationContext;
-
-/**
- * TransactionReaper is responsible for scanning the transaction table, checking if sessions are
- * still alive and deleting the transaction records if their sessions have expired.
- */
-class TransactionReaper {
-public:
- enum class Type {
- kReplicaSet,
- kSharded,
- };
-
- virtual ~TransactionReaper() = 0;
-
- virtual int reap(OperationContext* OperationContext) = 0;
-
- /**
- * The implementation of the sessions collections is different in replica sets versus sharded
- * clusters, so we have a factory to pick the right impl.
- */
- static std::unique_ptr<TransactionReaper> make(Type type,
- std::shared_ptr<SessionsCollection> collection);
-};
-
-} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper.idl b/src/mongo/db/transaction_reaper.idl
deleted file mode 100644
index 7146d869a3f..00000000000
--- a/src/mongo/db/transaction_reaper.idl
+++ /dev/null
@@ -1,52 +0,0 @@
-# Copyright (C) 2019-present MongoDB, Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the Server Side Public License, version 1,
-# as published by MongoDB, Inc.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# Server Side Public License for more details.
-#
-# You should have received a copy of the Server Side Public License
-# along with this program. If not, see
-# <http://www.mongodb.com/licensing/server-side-public-license>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the Server Side Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-#
-
-global:
- cpp_namespace: "mongo"
-
-server_parameters:
- TransactionRecordMinimumLifetimeMinutes:
- # The minimum lifetime for a transaction record is how long it has to have lived on the server
- # before we'll consider it for cleanup. This is effectively the window for how long it is
- # permissible for a mongos to hang before we're willing to accept a failure of the retryable write
- # subsystem.
- #
- # Specifically, we imagine that a client connects to one mongos on a session and performs a
- # retryable write. That mongos hangs. Then the client connects to a new mongos on the same
- # session and successfully executes its write. After a day passes, the session will time out,
- # cleaning up the retryable write. Then the original mongos wakes up, vivifies the session and
- # executes the write (because all records of the session + transaction have been deleted).
- #
- # So the write is performed twice, which is unavoidable without losing session vivification and/or
- # requiring synchronized clocks all the way out to the client. In lieu of that we provide a weaker
- # guarantee after the minimum transaction lifetime.
- description: 'The minimum lifetime for a transaction record.'
- set_at: startup
- cpp_vartype: int
- cpp_varname: gTransactionRecordMinimumLifetimeMinutes
- default: 30
diff --git a/src/mongo/db/transaction_reaper_d.cpp b/src/mongo/db/transaction_reaper_d.cpp
new file mode 100644
index 00000000000..1370c60c4f6
--- /dev/null
+++ b/src/mongo/db/transaction_reaper_d.cpp
@@ -0,0 +1,133 @@
+/**
+ * Copyright (C) 2019-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/transaction_reaper_d.h"
+
+#include "mongo/bson/bsonmisc.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/ops/write_ops.h"
+#include "mongo/db/session_txn_record_gen.h"
+#include "mongo/db/sessions_collection.h"
+#include "mongo/s/write_ops/batched_command_response.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+namespace {
+
+const auto kIdProjection = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
+const auto kSortById = BSON(SessionTxnRecord::kSessionIdFieldName << 1);
+const auto kLastWriteDateFieldName = SessionTxnRecord::kLastWriteDateFieldName;
+
+/**
+ * Removes the specified set of session ids from the persistent sessions collection and returns the
+ * number of sessions actually removed.
+ */
+int removeSessionsTransactionRecords(OperationContext* opCtx,
+ SessionsCollection& sessionsCollection,
+ const LogicalSessionIdSet& sessionIdsToRemove) {
+ if (sessionIdsToRemove.empty()) {
+ return 0;
+ }
+
+ // From the passed-in sessions, find the ones which are actually expired/removed
+ auto expiredSessionIds =
+ uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, sessionIdsToRemove));
+
+ write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace);
+ deleteOp.setWriteCommandBase([] {
+ write_ops::WriteCommandBase base;
+ base.setOrdered(false);
+ return base;
+ }());
+ deleteOp.setDeletes([&] {
+ std::vector<write_ops::DeleteOpEntry> entries;
+ for (auto it = expiredSessionIds.begin(); it != expiredSessionIds.end(); ++it) {
+ entries.emplace_back(BSON(LogicalSessionRecord::kIdFieldName << it->toBSON()),
+ false /* multi = false */);
+ }
+ return entries;
+ }());
+
+ BSONObj result;
+
+ DBDirectClient client(opCtx);
+ client.runCommand(NamespaceString::kSessionTransactionsTableNamespace.db().toString(),
+ deleteOp.toBSON({}),
+ result);
+
+ BatchedCommandResponse response;
+ std::string errmsg;
+ uassert(ErrorCodes::FailedToParse,
+ str::stream() << "Failed to parse response " << result,
+ response.parseBSON(result, &errmsg));
+ uassertStatusOK(response.getTopLevelStatus());
+
+ return response.getN();
+}
+
+} // namespace
+
+int TransactionReaperD::reapSessionsOlderThan(OperationContext* opCtx,
+ SessionsCollection& sessionsCollection,
+ Date_t possiblyExpired) {
+ // Scan for records older than the minimum lifetime and uses a sort to walk the '_id' index
+ DBDirectClient client(opCtx);
+ auto cursor =
+ client.query(NamespaceString::kSessionTransactionsTableNamespace,
+ Query(BSON(kLastWriteDateFieldName << LT << possiblyExpired)).sort(kSortById),
+ 0,
+ 0,
+ &kIdProjection);
+
+ // The max batch size is chosen so that a single batch won't exceed the 16MB BSON object
+ // size limit
+ LogicalSessionIdSet lsids;
+ const int kMaxBatchSize = 10'000;
+
+ int numReaped = 0;
+ while (cursor->more()) {
+ auto transactionSession = SessionsCollectionFetchResultIndividualResult::parse(
+ "TransactionSession"_sd, cursor->next());
+
+ lsids.insert(transactionSession.get_id());
+ if (lsids.size() > kMaxBatchSize) {
+ numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids);
+ lsids.clear();
+ }
+ }
+
+ numReaped += removeSessionsTransactionRecords(opCtx, sessionsCollection, lsids);
+
+ return numReaped;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_factory_mongos.h b/src/mongo/db/transaction_reaper_d.h
index 739c633631d..057583b0ec9 100644
--- a/src/mongo/db/logical_session_cache_factory_mongos.h
+++ b/src/mongo/db/transaction_reaper_d.h
@@ -1,5 +1,5 @@
/**
- * Copyright (C) 2018-present MongoDB, Inc.
+ * Copyright (C) 2019-present MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the Server Side Public License, version 1,
@@ -29,13 +29,18 @@
#pragma once
-#include <memory>
-
-#include "mongo/db/logical_session_cache.h"
-#include "mongo/db/service_liaison.h"
+#include "mongo/util/time_support.h"
namespace mongo {
-std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheS();
+class SessionsCollection;
+class OperationContext;
+
+class TransactionReaperD {
+public:
+ static int reapSessionsOlderThan(OperationContext* OperationContext,
+ SessionsCollection& sessionsCollection,
+ Date_t possiblyExpired);
+};
} // namespace mongo
diff --git a/src/mongo/db/transaction_reaper_test.cpp b/src/mongo/db/transaction_reaper_d_test.cpp
index 546ba58568a..165ed3597a0 100644
--- a/src/mongo/db/transaction_reaper_test.cpp
+++ b/src/mongo/db/transaction_reaper_d_test.cpp
@@ -34,7 +34,7 @@
#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/sessions_collection_mock.h"
-#include "mongo/db/transaction_reaper.h"
+#include "mongo/db/transaction_reaper_d.h"
#include "mongo/util/clock_source_mock.h"
namespace mongo {
@@ -61,15 +61,12 @@ protected:
std::shared_ptr<MockSessionsCollectionImpl> _collectionMock{
std::make_shared<MockSessionsCollectionImpl>()};
- std::unique_ptr<TransactionReaper> _reaper{
- TransactionReaper::make(TransactionReaper::Type::kReplicaSet,
- std::make_shared<MockSessionsCollection>(_collectionMock))};
+ std::shared_ptr<SessionsCollection> _collection{
+ std::make_shared<MockSessionsCollection>(_collectionMock)};
};
TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) {
- _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now()));
- _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now()));
-
+ // Create some "old" sessions
DBDirectClient client(_opCtx);
SessionTxnRecord txn1(
makeLogicalSessionIdForTest(), 100, repl::OpTime(Timestamp(100), 1), clock()->now());
@@ -79,8 +76,15 @@ TEST_F(TransactionReaperTest, ReapSomeExpiredSomeNot) {
client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(),
std::vector{txn1.toBSON(), txn2.toBSON()});
+ // Add some "new" sessions to ensure they don't get reaped
clock()->advance(Minutes{31});
- ASSERT_EQ(2, _reaper->reap(_opCtx));
+ _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now()));
+ _collectionMock->add(LogicalSessionRecord(makeLogicalSessionIdForTest(), clock()->now()));
+
+
+ ASSERT_EQ(2,
+ TransactionReaperD::reapSessionsOlderThan(
+ _opCtx, *_collection, clock()->now() - Minutes{30}));
}
} // namespace
diff --git a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
index ae9f55f6ae8..fa606b87c3f 100644
--- a/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
+++ b/src/mongo/embedded/logical_session_cache_factory_embedded.cpp
@@ -31,8 +31,6 @@
#include "mongo/platform/basic.h"
-#include <memory>
-
#include "mongo/embedded/logical_session_cache_factory_embedded.h"
#include "mongo/db/logical_session_cache_impl.h"
@@ -46,11 +44,14 @@ namespace mongo {
std::unique_ptr<LogicalSessionCache> makeLogicalSessionCacheEmbedded() {
auto liaison = std::make_unique<ServiceLiaisonMongod>();
- // Set up the logical session cache
auto sessionsColl = std::make_shared<SessionsCollectionStandalone>();
return stdx::make_unique<LogicalSessionCacheImpl>(
- std::move(liaison), std::move(sessionsColl), nullptr /* reaper */);
+ std::move(liaison),
+ std::move(sessionsColl),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ });
}
} // namespace mongo
diff --git a/src/mongo/s/SConscript b/src/mongo/s/SConscript
index 23a1e9a5019..88db8654426 100644
--- a/src/mongo/s/SConscript
+++ b/src/mongo/s/SConscript
@@ -106,7 +106,6 @@ env.Library(
'client/sharding_network_connection_hook.cpp',
],
LIBDEPS=[
- '$BUILD_DIR/mongo/db/logical_session_cache_factory_mongos',
'$BUILD_DIR/mongo/s/catalog/dist_lock_catalog_impl',
'$BUILD_DIR/mongo/s/catalog/replset_dist_lock_manager',
'$BUILD_DIR/mongo/s/catalog/sharding_catalog_client_impl',
diff --git a/src/mongo/s/server.cpp b/src/mongo/s/server.cpp
index 29c490ba8b0..6580faeb0c5 100644
--- a/src/mongo/s/server.cpp
+++ b/src/mongo/s/server.cpp
@@ -55,13 +55,15 @@
#include "mongo/db/lasterror.h"
#include "mongo/db/log_process_details.h"
#include "mongo/db/logical_clock.h"
-#include "mongo/db/logical_session_cache_factory_mongos.h"
+#include "mongo/db/logical_session_cache_impl.h"
#include "mongo/db/logical_time_metadata_hook.h"
#include "mongo/db/logical_time_validator.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_options.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/service_liaison_mongos.h"
#include "mongo/db/session_killer.h"
+#include "mongo/db/sessions_collection_sharded.h"
#include "mongo/db/startup_warnings_common.h"
#include "mongo/db/wire_version.h"
#include "mongo/executor/task_executor_pool.h"
@@ -84,7 +86,6 @@
#include "mongo/s/query/cluster_cursor_manager.h"
#include "mongo/s/service_entry_point_mongos.h"
#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
-#include "mongo/s/sharding_egress_metadata_hook_for_mongos.h"
#include "mongo/s/sharding_initialization.h"
#include "mongo/s/sharding_uptime_reporter.h"
#include "mongo/s/version_mongos.h"
@@ -187,6 +188,9 @@ void cleanupTask(ServiceContext* serviceContext) {
Client::initThread(getThreadName());
Client& client = cc();
+ // Join the logical session cache before the transport layer
+ LogicalSessionCache::get(serviceContext)->joinOnShutDown();
+
// Shutdown the TransportLayer so that new connections aren't accepted
if (auto tl = serviceContext->getTransportLayer()) {
log(LogComponent::kNetwork) << "shutdown: going to close all sockets...";
@@ -496,8 +500,13 @@ ExitCode runMongosServer(ServiceContext* serviceContext) {
SessionKiller::set(serviceContext,
std::make_shared<SessionKiller>(serviceContext, killSessionsRemote));
- // Set up the logical session cache
- LogicalSessionCache::set(serviceContext, makeLogicalSessionCacheS());
+ LogicalSessionCache::set(serviceContext,
+ stdx::make_unique<LogicalSessionCacheImpl>(
+ stdx::make_unique<ServiceLiaisonMongos>(),
+ stdx::make_unique<SessionsCollectionSharded>(),
+ [](OperationContext*, SessionsCollection&, Date_t) {
+ return 0; /* No op*/
+ }));
status = serviceContext->getServiceExecutor()->start();
if (!status.isOK()) {