summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-08 18:14:07 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-17 07:18:50 -0400
commitaa9f6a202e0709adf14046cb27504864adaf732b (patch)
tree144df4c5076284b611cf67ff54d63f2190a3a037 /src/mongo/db
parent1d05d4b956221cfeb731892a4387d1470f999114 (diff)
downloadmongo-aa9f6a202e0709adf14046cb27504864adaf732b.tar.gz
SERVER-37837 Examine and reap sessions from the SessionsCatalog
This change makes the logical sessions cache query and reap sessions, which are possibly only in-memory on the SessionsCatalog.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/SConscript2
-rw-r--r--src/mongo/db/commands/SConscript73
-rw-r--r--src/mongo/db/commands/logical_session_server_status_section.cpp (renamed from src/mongo/db/logical_session_server_status_section.cpp)14
-rw-r--r--src/mongo/db/commands/reap_logical_session_cache_now.cpp13
-rw-r--r--src/mongo/db/commands/refresh_logical_session_cache_now.cpp14
-rw-r--r--src/mongo/db/commands/refresh_sessions_command.cpp22
-rw-r--r--src/mongo/db/repl/repl_set_commands.cpp39
-rw-r--r--src/mongo/db/s/transaction_coordinator_service.cpp9
-rw-r--r--src/mongo/db/session.h3
-rw-r--r--src/mongo/db/session_catalog.cpp65
-rw-r--r--src/mongo/db/session_catalog.h38
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp37
-rw-r--r--src/mongo/db/session_catalog_mongod.h2
-rw-r--r--src/mongo/db/session_catalog_test.cpp102
-rw-r--r--src/mongo/db/sessions_collection.cpp7
-rw-r--r--src/mongo/db/sessions_collection.h5
-rw-r--r--src/mongo/db/transaction_participant.cpp12
17 files changed, 333 insertions, 124 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index c6fee300b60..de9d62a0e49 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1529,7 +1529,6 @@ env.Library(
source=[
'initialize_operation_session_info.cpp',
'logical_session_cache_impl.cpp',
- 'logical_session_server_status_section.cpp',
],
LIBDEPS=[
'logical_session_cache',
@@ -1540,7 +1539,6 @@ env.Library(
'kill_sessions',
],
LIBDEPS_PRIVATE=[
- '$BUILD_DIR/mongo/db/commands/server_status',
'$BUILD_DIR/mongo/db/s/sharding_api_d',
]
)
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 8c7fc3639ad..2d1be53c64f 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -91,21 +91,21 @@ env.Library(
# Commands available in every process that executes commands
env.Library(
- target="core",
+ target='core',
source=[
- "end_sessions_command.cpp",
- "fail_point_cmd.cpp",
- "find_and_modify_common.cpp",
- "generic.cpp",
- "hashcmd.cpp",
- "kill_all_sessions_by_pattern_command.cpp",
- "kill_all_sessions_command.cpp",
- "kill_sessions_command.cpp",
- "parameters.cpp",
- "refresh_logical_session_cache_now.cpp",
- "refresh_sessions_command.cpp",
- "rename_collection_common.cpp",
- "start_session_command.cpp",
+ 'end_sessions_command.cpp',
+ 'fail_point_cmd.cpp',
+ 'find_and_modify_common.cpp',
+ 'generic.cpp',
+ 'hashcmd.cpp',
+ 'kill_all_sessions_by_pattern_command.cpp',
+ 'kill_all_sessions_command.cpp',
+ 'kill_sessions_command.cpp',
+ 'parameters.cpp',
+ 'refresh_logical_session_cache_now.cpp',
+ 'refresh_sessions_command.cpp',
+ 'rename_collection_common.cpp',
+ 'start_session_command.cpp',
env.Idlc('end_sessions.idl')[0],
env.Idlc('parameters.idl')[0],
],
@@ -114,38 +114,39 @@ env.Library(
'$BUILD_DIR/mongo/db/auth/auth',
'$BUILD_DIR/mongo/db/auth/authprivilege',
'$BUILD_DIR/mongo/db/commands',
- '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/kill_sessions',
- '$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
- '$BUILD_DIR/mongo/db/logical_session_id',
+ '$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
+ '$BUILD_DIR/mongo/db/logical_session_id',
'$BUILD_DIR/mongo/db/mongohasher',
'$BUILD_DIR/mongo/db/server_options_core',
'$BUILD_DIR/mongo/idl/server_parameter',
'$BUILD_DIR/mongo/logger/parse_log_component_settings',
'$BUILD_DIR/mongo/rpc/protocol',
+ 'test_commands_enabled',
],
)
# Commands available in all mongodb server processes (mongod, mongos, etc.)
env.Library(
- target="servers",
+ target='servers',
source=[
- "authentication_commands.cpp",
- "conn_pool_stats.cpp",
- "conn_pool_sync.cpp",
- "connection_status.cpp",
- "drop_connections_command.cpp",
+ 'authentication_commands.cpp',
+ 'conn_pool_stats.cpp',
+ 'conn_pool_sync.cpp',
+ 'connection_status.cpp',
+ 'drop_connections_command.cpp',
+ 'generic_servers.cpp',
+ 'isself.cpp',
+ 'logical_session_server_status_section.cpp',
+ 'mr_common.cpp',
+ 'reap_logical_session_cache_now.cpp',
+ 'refresh_sessions_command_internal.cpp',
+ 'traffic_recording_cmds.cpp',
+ 'user_management_commands_common.cpp',
env.Idlc('drop_connections.idl')[0],
- "generic_servers.cpp",
- "isself.cpp",
- "mr_common.cpp",
- "reap_logical_session_cache_now.cpp",
- "refresh_sessions_command_internal.cpp",
- "traffic_recording_cmds.cpp",
- "user_management_commands_common.cpp",
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/client/clientdriver_minimal',
@@ -153,25 +154,27 @@ env.Library(
'$BUILD_DIR/mongo/db/auth/sasl_options',
'$BUILD_DIR/mongo/db/auth/user_document_parser',
'$BUILD_DIR/mongo/db/commands',
- '$BUILD_DIR/mongo/db/commands/test_commands_enabled',
'$BUILD_DIR/mongo/db/common',
'$BUILD_DIR/mongo/db/log_process_details',
- '$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_cache_impl',
- '$BUILD_DIR/mongo/db/logical_session_id',
+ '$BUILD_DIR/mongo/db/logical_session_cache',
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
+ '$BUILD_DIR/mongo/db/logical_session_id',
'$BUILD_DIR/mongo/db/repl/isself',
'$BUILD_DIR/mongo/db/repl/repl_coordinator_interface',
- '$BUILD_DIR/mongo/executor/egress_tag_closer_manager',
+ '$BUILD_DIR/mongo/db/session_catalog',
'$BUILD_DIR/mongo/db/traffic_recorder',
+ '$BUILD_DIR/mongo/executor/egress_tag_closer_manager',
'$BUILD_DIR/mongo/executor/task_executor_pool',
'$BUILD_DIR/mongo/rpc/client_metadata',
- '$BUILD_DIR/mongo/s/sharding_legacy_api',
'$BUILD_DIR/mongo/s/coreshard',
+ '$BUILD_DIR/mongo/s/sharding_legacy_api',
'$BUILD_DIR/mongo/scripting/scripting_common',
'$BUILD_DIR/mongo/util/ntservice',
'core',
'feature_compatibility_parsers',
+ 'server_status',
+ 'test_commands_enabled',
]
)
diff --git a/src/mongo/db/logical_session_server_status_section.cpp b/src/mongo/db/commands/logical_session_server_status_section.cpp
index 971748f7de6..b23605a5506 100644
--- a/src/mongo/db/logical_session_server_status_section.cpp
+++ b/src/mongo/db/commands/logical_session_server_status_section.cpp
@@ -35,14 +35,11 @@
#include "mongo/db/operation_context.h"
namespace mongo {
-
namespace {
-class LogicalSessionSSS : public ServerStatusSection {
+class LogicalSessionServerStatusSection : public ServerStatusSection {
public:
- LogicalSessionSSS() : ServerStatusSection("logicalSessionRecordCache") {}
-
- ~LogicalSessionSSS() override = default;
+ LogicalSessionServerStatusSection() : ServerStatusSection("logicalSessionRecordCache") {}
bool includeByDefault() const override {
return true;
@@ -50,11 +47,12 @@ public:
BSONObj generateSection(OperationContext* opCtx,
const BSONElement& configElement) const override {
- auto lsCache = LogicalSessionCache::get(opCtx);
- return lsCache ? lsCache->getStats().toBSON() : BSONObj();
+ const auto logicalSessionCache = LogicalSessionCache::get(opCtx);
+
+ return logicalSessionCache ? logicalSessionCache->getStats().toBSON() : BSONObj();
}
-} LogicalSessionSSS;
+} logicalSessionsServerStatusSection;
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/commands/reap_logical_session_cache_now.cpp b/src/mongo/db/commands/reap_logical_session_cache_now.cpp
index 8379169e84e..e60e0fb28fd 100644
--- a/src/mongo/db/commands/reap_logical_session_cache_now.cpp
+++ b/src/mongo/db/commands/reap_logical_session_cache_now.cpp
@@ -37,13 +37,9 @@
#include "mongo/db/operation_context.h"
namespace mongo {
-
namespace {
class ReapLogicalSessionCacheNowCommand final : public BasicCommand {
- ReapLogicalSessionCacheNowCommand(const ReapLogicalSessionCacheNowCommand&) = delete;
- ReapLogicalSessionCacheNowCommand& operator=(const ReapLogicalSessionCacheNowCommand&) = delete;
-
public:
ReapLogicalSessionCacheNowCommand() : BasicCommand("reapLogicalSessionCacheNow") {}
@@ -70,10 +66,10 @@ public:
return Status::OK();
}
- virtual bool run(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
+ bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
auto cache = LogicalSessionCache::get(opCtx);
auto client = opCtx->getClient();
@@ -87,5 +83,4 @@ public:
MONGO_REGISTER_TEST_COMMAND(ReapLogicalSessionCacheNowCommand);
} // namespace
-
} // namespace mongo
diff --git a/src/mongo/db/commands/refresh_logical_session_cache_now.cpp b/src/mongo/db/commands/refresh_logical_session_cache_now.cpp
index 8d09a15f17a..b6fb0cb4fd4 100644
--- a/src/mongo/db/commands/refresh_logical_session_cache_now.cpp
+++ b/src/mongo/db/commands/refresh_logical_session_cache_now.cpp
@@ -38,14 +38,9 @@
#include "mongo/db/operation_context.h"
namespace mongo {
-
namespace {
class RefreshLogicalSessionCacheNowCommand final : public BasicCommand {
- RefreshLogicalSessionCacheNowCommand(const RefreshLogicalSessionCacheNowCommand&) = delete;
- RefreshLogicalSessionCacheNowCommand& operator=(const RefreshLogicalSessionCacheNowCommand&) =
- delete;
-
public:
RefreshLogicalSessionCacheNowCommand() : BasicCommand("refreshLogicalSessionCacheNow") {}
@@ -76,10 +71,10 @@ public:
return Status::OK();
}
- virtual bool run(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
+ bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
auto cache = LogicalSessionCache::get(opCtx);
auto client = opCtx->getClient();
@@ -95,5 +90,4 @@ public:
MONGO_REGISTER_TEST_COMMAND(RefreshLogicalSessionCacheNowCommand);
} // namespace
-
} // namespace mongo
diff --git a/src/mongo/db/commands/refresh_sessions_command.cpp b/src/mongo/db/commands/refresh_sessions_command.cpp
index 229db890078..8e2c663e6cc 100644
--- a/src/mongo/db/commands/refresh_sessions_command.cpp
+++ b/src/mongo/db/commands/refresh_sessions_command.cpp
@@ -40,26 +40,28 @@
#include "mongo/db/refresh_sessions_gen.h"
namespace mongo {
+namespace {
class RefreshSessionsCommand final : public BasicCommand {
- RefreshSessionsCommand(const RefreshSessionsCommand&) = delete;
- RefreshSessionsCommand& operator=(const RefreshSessionsCommand&) = delete;
-
public:
RefreshSessionsCommand() : BasicCommand("refreshSessions") {}
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kAlways;
}
+
bool adminOnly() const override {
return false;
}
+
bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
+
std::string help() const override {
return "renew a set of logical sessions";
}
+
Status checkAuthForOperation(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj) const override {
@@ -75,18 +77,18 @@ public:
}
}
- virtual bool run(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
+ bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
IDLParserErrorContext ctx("RefreshSessionsCmdFromClient");
auto cmd = RefreshSessionsCmdFromClient::parse(ctx, cmdObj);
- auto res =
- LogicalSessionCache::get(opCtx->getServiceContext())->refreshSessions(opCtx, cmd);
- uassertStatusOK(res);
+ uassertStatusOK(LogicalSessionCache::get(opCtx)->refreshSessions(opCtx, cmd));
return true;
}
+
} refreshSessionsCommand;
+} // namespace
} // namespace mongo
diff --git a/src/mongo/db/repl/repl_set_commands.cpp b/src/mongo/db/repl/repl_set_commands.cpp
index cf14e81a6aa..a38d510fd5a 100644
--- a/src/mongo/db/repl/repl_set_commands.cpp
+++ b/src/mongo/db/repl/repl_set_commands.cpp
@@ -359,18 +359,20 @@ private:
class CmdReplSetReconfig : public ReplSetCommand {
public:
+ CmdReplSetReconfig() : ReplSetCommand("replSetReconfig") {}
+
std::string help() const override {
return "Adjust configuration of a replica set\n"
"{ replSetReconfig : config_object }\n"
"http://dochub.mongodb.org/core/replicasetcommands";
}
- CmdReplSetReconfig() : ReplSetCommand("replSetReconfig") {}
- virtual bool run(OperationContext* opCtx,
- const string&,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) {
- Status status = ReplicationCoordinator::get(opCtx)->checkReplEnabledForCommand(&result);
- uassertStatusOK(status);
+
+ bool run(OperationContext* opCtx,
+ const string&,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
+ const auto replCoord = ReplicationCoordinator::get(opCtx);
+ uassertStatusOK(replCoord->checkReplEnabledForCommand(&result));
if (cmdObj["replSetReconfig"].type() != Object) {
result.append("errmsg", "no configuration specified");
@@ -380,23 +382,22 @@ public:
ReplicationCoordinator::ReplSetReconfigArgs parsedArgs;
parsedArgs.newConfigObj = cmdObj["replSetReconfig"].Obj();
parsedArgs.force = cmdObj.hasField("force") && cmdObj["force"].trueValue();
- status =
- ReplicationCoordinator::get(opCtx)->processReplSetReconfig(opCtx, parsedArgs, &result);
-
- Lock::GlobalLock globalLock(opCtx, MODE_IX);
+ auto status = replCoord->processReplSetReconfig(opCtx, parsedArgs, &result);
- WriteUnitOfWork wuow(opCtx);
if (status.isOK() && !parsedArgs.force) {
+ const auto service = opCtx->getServiceContext();
+
+ Lock::GlobalLock globalLock(opCtx, MODE_IX);
+ WriteUnitOfWork wuow(opCtx);
// Users must not be allowed to provide their own contents for the o2 field.
// o2 field of no-ops is supposed to be used internally.
- getGlobalServiceContext()->getOpObserver()->onOpMessage(
- opCtx,
- BSON("msg"
- << "Reconfig set"
- << "version"
- << parsedArgs.newConfigObj["version"]));
+ service->getOpObserver()->onOpMessage(opCtx,
+ BSON("msg"
+ << "Reconfig set"
+ << "version"
+ << parsedArgs.newConfigObj["version"]));
+ wuow.commit();
}
- wuow.commit();
uassertStatusOK(status);
return true;
diff --git a/src/mongo/db/s/transaction_coordinator_service.cpp b/src/mongo/db/s/transaction_coordinator_service.cpp
index d803e0987a3..2e93ca54435 100644
--- a/src/mongo/db/s/transaction_coordinator_service.cpp
+++ b/src/mongo/db/s/transaction_coordinator_service.cpp
@@ -169,7 +169,9 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
LOG(0) << "Need to resume coordinating commit for " << coordinatorDocs.size()
<< " transactions";
- auto clockSource = opCtx->getServiceContext()->getFastClockSource();
+ const auto service = opCtx->getServiceContext();
+ const auto clockSource = service->getFastClockSource();
+
auto& catalog = catalogAndScheduler->catalog;
auto& scheduler = catalogAndScheduler->scheduler;
@@ -180,7 +182,7 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
const auto txnNumber = *doc.getId().getTxnNumber();
auto coordinator = std::make_shared<TransactionCoordinator>(
- opCtx->getServiceContext(),
+ service,
lsid,
txnNumber,
scheduler.makeChildScheduler(),
@@ -191,9 +193,6 @@ void TransactionCoordinatorService::onStepUp(OperationContext* opCtx,
}
})
.tapAll([catalogAndScheduler = _catalogAndScheduler](Status status) {
- // TODO (SERVER-38320): Reschedule the step-up task if the interruption was not due
- // to stepdown.
-
auto& catalog = catalogAndScheduler->catalog;
catalog.exitStepUp(status);
});
diff --git a/src/mongo/db/session.h b/src/mongo/db/session.h
index 635f3fc7167..b26eb7cdb19 100644
--- a/src/mongo/db/session.h
+++ b/src/mongo/db/session.h
@@ -72,6 +72,9 @@ private:
// practice, it is only used inside of the SessionCatalog itself.
OperationContext* _checkoutOpCtx{nullptr};
+ // Keeps the last time this session was checked-out
+ Date_t _lastCheckout{Date_t::now()};
+
// Counter indicating the number of times ObservableSession::kill has been called on this
// session, which have not yet had a corresponding call to checkOutSessionForKill.
int _killsRequested{0};
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index 2e0d6de0953..db574180f6f 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/session_catalog.h"
-#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/stdx/memory.h"
#include "mongo/util/log.h"
@@ -84,12 +83,16 @@ SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSession(Operati
// Wait until the session is no longer checked out and until the previously scheduled kill has
// completed
+ ++sri->numWaitingToCheckOut;
+ ON_BLOCK_EXIT([&] { --sri->numWaitingToCheckOut; });
+
opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri]() {
ObservableSession osession(ul, sri->session);
return !osession.currentOperation() && !osession._killed();
});
sri->session._checkoutOpCtx = opCtx;
+ sri->session._lastCheckout = Date_t::now();
return ScopedCheckedOutSession(
*this, std::move(sri), boost::none /* Not checked out for kill */);
@@ -107,33 +110,62 @@ SessionCatalog::SessionToKill SessionCatalog::checkOutSessionForKill(OperationCo
invariant(ObservableSession(ul, sri->session)._killed());
// Wait until the session is no longer checked out
+ ++sri->numWaitingToCheckOut;
+ ON_BLOCK_EXIT([&] { --sri->numWaitingToCheckOut; });
+
opCtx->waitForConditionOrInterrupt(sri->availableCondVar, ul, [&ul, &sri] {
ObservableSession osession(ul, sri->session);
return !osession.currentOperation();
});
sri->session._checkoutOpCtx = opCtx;
+ sri->session._lastCheckout = Date_t::now();
return SessionToKill(ScopedCheckedOutSession(*this, std::move(sri), std::move(killToken)));
}
void SessionCatalog::scanSession(const LogicalSessionId& lsid,
const ScanSessionsCallbackFn& workerFn) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
- auto it = _sessions.find(lsid);
- if (it != _sessions.end())
- workerFn({lg, it->second->session});
+ std::unique_ptr<SessionRuntimeInfo> sessionToReap;
+
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ auto it = _sessions.find(lsid);
+ if (it != _sessions.end()) {
+ auto& sri = it->second;
+ ObservableSession osession(lg, sri->session);
+ workerFn(osession);
+
+ if (osession._markedForReap && !osession._killed() && !osession.currentOperation() &&
+ !sri->numWaitingToCheckOut) {
+ sessionToReap = std::move(sri);
+ _sessions.erase(it);
+ }
+ }
+ }
}
void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
const ScanSessionsCallbackFn& workerFn) {
- stdx::lock_guard<stdx::mutex> lg(_mutex);
+ std::vector<std::unique_ptr<SessionRuntimeInfo>> sessionsToReap;
+
+ {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
- LOG(2) << "Beginning scanSessions. Scanning " << _sessions.size() << " sessions.";
+ LOG(2) << "Beginning scanSessions. Scanning " << _sessions.size() << " sessions.";
- for (auto& sessionEntry : _sessions) {
- if (matcher.match(sessionEntry.first)) {
- workerFn({lg, sessionEntry.second->session});
+ for (auto it = _sessions.begin(); it != _sessions.end(); ++it) {
+ if (matcher.match(it->first)) {
+ auto& sri = it->second;
+ ObservableSession osession(lg, sri->session);
+ workerFn(osession);
+
+ if (osession._markedForReap && !osession._killed() &&
+ !osession.currentOperation() && !sri->numWaitingToCheckOut) {
+ sessionsToReap.emplace_back(std::move(sri));
+ _sessions.erase(it++);
+ }
+ }
}
}
}
@@ -147,6 +179,11 @@ SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& ls
return ObservableSession(lg, sri->session).kill();
}
+size_t SessionCatalog::size() const {
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
+ return _sessions.size();
+}
+
SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeInfo(
WithLock, OperationContext* opCtx, const LogicalSessionId& lsid) {
auto it = _sessions.find(lsid);
@@ -174,6 +211,10 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
}
}
+SessionCatalog::SessionRuntimeInfo::~SessionRuntimeInfo() {
+ invariant(!numWaitingToCheckOut);
+}
+
SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) const {
const bool firstKiller = (0 == _session->_killsRequested);
++_session->_killsRequested;
@@ -189,6 +230,10 @@ SessionCatalog::KillToken ObservableSession::kill(ErrorCodes::Error reason) cons
return SessionCatalog::KillToken(getSessionId());
}
+void ObservableSession::markForReap() {
+ _markedForReap = true;
+}
+
bool ObservableSession::_killed() const {
return _session->_killsRequested > 0;
}
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index c88ed62eb96..d0f97902461 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -98,7 +98,7 @@ public:
* Iterates through the SessionCatalog and applies 'workerFn' to each Session. This locks the
* SessionCatalog.
*/
- using ScanSessionsCallbackFn = stdx::function<void(const ObservableSession&)>;
+ using ScanSessionsCallbackFn = stdx::function<void(ObservableSession&)>;
void scanSession(const LogicalSessionId& lsid, const ScanSessionsCallbackFn& workerFn);
void scanSessions(const SessionKiller::Matcher& matcher,
const ScanSessionsCallbackFn& workerFn);
@@ -109,18 +109,30 @@ public:
*/
KillToken killSession(const LogicalSessionId& lsid);
+ /**
+ * Returns the total number of entries currently cached on the session catalog.
+ */
+ size_t size() const;
+
private:
struct SessionRuntimeInfo {
SessionRuntimeInfo(LogicalSessionId lsid) : session(std::move(lsid)) {}
+ ~SessionRuntimeInfo();
// Must only be accessed when the state is kInUse and only by the operation context, which
// currently has it checked out
Session session;
+ // Counts how many threads have called checkOutSession/checkOutSessionForKill and are
+ // blocked in it waiting for the session to become available. Used to block reaping of
+ // sessions entries from the map.
+ int numWaitingToCheckOut{0};
+
// Signaled when the state becomes available. Uses the transaction table's mutex to protect
// the state transitions.
stdx::condition_variable availableCondVar;
};
+ using SessionRuntimeInfoMap = LogicalSessionIdMap<std::unique_ptr<SessionRuntimeInfo>>;
/**
* Blocking method, which checks-out the session set on 'opCtx'.
@@ -141,10 +153,10 @@ private:
void _releaseSession(SessionRuntimeInfo* sri, boost::optional<KillToken> killToken);
// Protects the state below
- stdx::mutex _mutex;
+ mutable stdx::mutex _mutex;
// Owns the Session objects for all current Sessions.
- LogicalSessionIdMap<std::unique_ptr<SessionRuntimeInfo>> _sessions;
+ SessionRuntimeInfoMap _sessions;
};
/**
@@ -205,10 +217,10 @@ private:
class SessionCatalog::SessionToKill {
public:
SessionToKill(ScopedCheckedOutSession&& scos) : _scos(std::move(scos)) {}
+
Session* get() const {
return _scos.get();
}
-
const LogicalSessionId& getSessionId() const {
return get()->getSessionId();
}
@@ -250,6 +262,13 @@ public:
}
/**
+ * Returns when is the last time this session was checked-out, for reaping purposes.
+ */
+ Date_t getLastCheckout() const {
+ return _session->_lastCheckout;
+ }
+
+ /**
* Increments the number of "killers" for this session and returns a 'kill token' to to be
* passed later on to 'checkOutSessionForKill' method of the SessionCatalog in order to permit
* the caller to execute any kill cleanup tasks. This token is later used to decrement the
@@ -265,6 +284,16 @@ public:
SessionCatalog::KillToken kill(ErrorCodes::Error reason = ErrorCodes::Interrupted) const;
/**
+ * Indicates to the SessionCatalog that the session tracked by this object is safe to be deleted
+ * from the map. It is up to the caller to provide the necessary checks that all the decorations
+ * they are using are prepared to be destroyed.
+ *
+ * Calling this method does not guarantee that the session will in fact be destroyed, which
+ * could happen if there are threads waiting for it to be checked-out.
+ */
+ void markForReap();
+
+ /**
* Returns a pointer to the Session itself.
*/
Session* get() const {
@@ -291,6 +320,7 @@ private:
Session* _session;
stdx::unique_lock<Client> _clientLock;
+ bool _markedForReap{false};
};
/**
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index 0611994c6be..518b871f3fc 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -141,6 +141,7 @@ int removeSessionsTransactionRecords(OperationContext* opCtx,
if (expiredSessionIds.empty())
return 0;
+ // Remove the session ids from the on-disk catalog
write_ops::Delete deleteOp(NamespaceString::kSessionTransactionsTableNamespace);
deleteOp.setWriteCommandBase([] {
write_ops::WriteCommandBase base;
@@ -343,6 +344,42 @@ void MongoDSessionCatalog::invalidateAllSessions(OperationContext* opCtx) {
int MongoDSessionCatalog::reapSessionsOlderThan(OperationContext* opCtx,
SessionsCollection& sessionsCollection,
Date_t possiblyExpired) {
+ {
+ const auto catalog = SessionCatalog::get(opCtx);
+
+ // Capture the possbily expired in-memory session ids
+ LogicalSessionIdSet lsids;
+ catalog->scanSessions(SessionKiller::Matcher(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(opCtx)}),
+ [&](const ObservableSession& session) {
+ if (session.getLastCheckout() < possiblyExpired) {
+ lsids.insert(session.getSessionId());
+ }
+ });
+
+ // From the passed-in sessions, find the ones which are actually expired/removed
+ auto expiredSessionIds =
+ uassertStatusOK(sessionsCollection.findRemovedSessions(opCtx, lsids));
+
+ // Remove the session ids from the in-memory catalog
+ for (const auto& lsid : expiredSessionIds) {
+ catalog->scanSession(lsid, [](ObservableSession& session) {
+ const auto participant = TransactionParticipant::get(session);
+ if (!participant.inMultiDocumentTransaction()) {
+ session.markForReap();
+ }
+ });
+ }
+ }
+
+ // The "unsafe" check for primary below is a best-effort attempt to ensure that the on-disk
+ // state reaping code doesn't run if the node is secondary and cause log spam. It is a work
+ // around the fact that the logical sessions cache is not registered to listen for replication
+ // state changes.
+ const auto replCoord = repl::ReplicationCoordinator::get(opCtx);
+ if (!replCoord->canAcceptWritesForDatabase_UNSAFE(opCtx, NamespaceString::kConfigDb))
+ return 0;
+
// Scan for records older than the minimum lifetime and uses a sort to walk the '_id' index
DBDirectClient client(opCtx);
auto cursor =
diff --git a/src/mongo/db/session_catalog_mongod.h b/src/mongo/db/session_catalog_mongod.h
index b55f8927f3f..591d234029b 100644
--- a/src/mongo/db/session_catalog_mongod.h
+++ b/src/mongo/db/session_catalog_mongod.h
@@ -69,6 +69,8 @@ public:
/**
* Locates session entries from the in-memory catalog and in 'config.transactions' which have
* not been referenced before 'possiblyExpired' and deletes them.
+ *
+ * Returns the number of sessions, which were reaped from the persisted store on disk.
*/
static int reapSessionsOlderThan(OperationContext* opCtx,
SessionsCollection& sessionsCollection,
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index 3bd63277365..da7712f89d4 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -117,6 +117,67 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) {
ASSERT(!OperationContextSession::get(_opCtx));
}
+TEST_F(SessionCatalogTest, ScanSession) {
+ // Create three sessions in the catalog.
+ const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest()};
+ for (const auto& lsid : lsids) {
+ stdx::async(stdx::launch::async, [this, lsid] {
+ ThreadClient tc(getServiceContext());
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
+ }).get();
+ }
+
+ catalog()->scanSession(lsids[0], [&lsids](const ObservableSession& session) {
+ ASSERT_EQ(lsids[0], session.get()->getSessionId());
+ });
+
+ catalog()->scanSession(lsids[1], [&lsids](const ObservableSession& session) {
+ ASSERT_EQ(lsids[1], session.get()->getSessionId());
+ });
+
+ catalog()->scanSession(lsids[2], [&lsids](const ObservableSession& session) {
+ ASSERT_EQ(lsids[2], session.get()->getSessionId());
+ });
+
+ catalog()->scanSession(makeLogicalSessionIdForTest(), [](const ObservableSession&) {
+ FAIL("The callback was called for non-existent session");
+ });
+}
+
+TEST_F(SessionCatalogTest, ScanSessionMarkForReapWhenSessionIsIdle) {
+ // Create three sessions in the catalog.
+ const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest()};
+ for (const auto& lsid : lsids) {
+ stdx::async(stdx::launch::async, [this, lsid] {
+ ThreadClient tc(getServiceContext());
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsid);
+ OperationContextSession ocs(opCtx.get());
+ }).get();
+ }
+
+ catalog()->scanSession(lsids[0],
+ [&lsids](ObservableSession& session) { session.markForReap(); });
+
+ catalog()->scanSession(lsids[0], [](const ObservableSession&) {
+ FAIL("The callback was called for non-existent session");
+ });
+
+ catalog()->scanSession(lsids[1], [&lsids](const ObservableSession& session) {
+ ASSERT_EQ(lsids[1], session.get()->getSessionId());
+ });
+
+ catalog()->scanSession(lsids[2], [&lsids](const ObservableSession& session) {
+ ASSERT_EQ(lsids[2], session.get()->getSessionId());
+ });
+}
+
TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) {
std::vector<LogicalSessionId> lsidsFound;
const auto workerFn = [&lsidsFound](const ObservableSession& session) {
@@ -157,6 +218,47 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) {
lsidsFound.clear();
}
+TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsMarkForReap) {
+ // Create three sessions in the catalog.
+ const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest()};
+
+ unittest::Barrier sessionsCheckedOut(2);
+ unittest::Barrier sessionsCheckedIn(2);
+
+ auto f = stdx::async(stdx::launch::async, [&] {
+ ThreadClient tc(getServiceContext());
+ auto opCtx = makeOperationContext();
+ opCtx->setLogicalSessionId(lsids[1]);
+ OperationContextSession ocs(opCtx.get());
+ sessionsCheckedOut.countDownAndWait();
+ sessionsCheckedIn.countDownAndWait();
+ });
+
+ // After this wait, session 1 is checked-out and waiting on the barrier, because of which only
+ // sessions 0 and 2 will be reaped
+ sessionsCheckedOut.countDownAndWait();
+
+ SessionKiller::Matcher matcherAllSessions(
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)});
+
+ catalog()->scanSessions(matcherAllSessions,
+ [&](ObservableSession& session) { session.markForReap(); });
+
+ catalog()->scanSessions(matcherAllSessions, [&](const ObservableSession& session) {
+ ASSERT_EQ(lsids[1], session.get()->getSessionId());
+ });
+
+ // After this point, session 1 is checked back in
+ sessionsCheckedIn.countDownAndWait();
+ f.get();
+
+ catalog()->scanSessions(matcherAllSessions, [&](const ObservableSession& session) {
+ ASSERT_EQ(lsids[1], session.get()->getSessionId());
+ });
+}
+
TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
const auto lsid = makeLogicalSessionIdForTest();
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index 7ffc762201e..6421302ab28 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -46,9 +46,6 @@
#include "mongo/stdx/memory.h"
namespace mongo {
-
-constexpr StringData SessionsCollection::kSessionsTTLIndex;
-
namespace {
// This batch size is chosen to ensure that we don't form requests larger than the 16mb limit.
@@ -161,6 +158,10 @@ Status runBulkCmd(StringData label,
} // namespace
+constexpr StringData SessionsCollection::kSessionsTTLIndex;
+
+SessionsCollection::SessionsCollection() = default;
+
SessionsCollection::~SessionsCollection() = default;
SessionsCollection::SendBatchFn SessionsCollection::makeSendFnForBatchWrite(
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index 64e78c8a476..91ae86e7c9f 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -34,8 +34,6 @@
namespace mongo {
-class BSONArrayBuilder;
-class BSONObjBuilder;
class DBClientBase;
class OperationContext;
@@ -46,7 +44,6 @@ class OperationContext;
* implement their own classes that fulfill this interface.
*/
class SessionsCollection {
-
public:
static constexpr StringData kSessionsTTLIndex = "lsidTTLIndex"_sd;
@@ -98,6 +95,8 @@ public:
static BSONObj generateCollModCmd();
protected:
+ SessionsCollection();
+
/**
* Makes a send function for the given client.
*/
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 3ba34fd40b3..548453ab45a 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -291,7 +291,9 @@ const BSONObj TransactionParticipant::kDeadEndSentinel(BSON("$incompleteOplogHis
TransactionParticipant::TransactionParticipant() = default;
-TransactionParticipant::~TransactionParticipant() = default;
+TransactionParticipant::~TransactionParticipant() {
+ // invariant(!_o.txnState.isInProgress());
+}
TransactionParticipant::Observer::Observer(const ObservableSession& osession)
: Observer(&getTransactionParticipant(osession.get())) {}
@@ -2027,20 +2029,18 @@ void TransactionParticipant::Participant::invalidate(OperationContext* opCtx) {
void TransactionParticipant::Participant::abortPreparedTransactionForRollback(
OperationContext* opCtx) {
- stdx::lock_guard<Client> lg(*opCtx->getClient());
-
- // Invalidate the session.
- _invalidate(lg);
-
uassert(51030,
str::stream() << "Cannot call abortPreparedTransactionForRollback on unprepared "
<< "transaction.",
o().txnState.isPrepared());
+ stdx::lock_guard<Client> lg(*opCtx->getClient());
+
// It should be safe to clear transactionOperationBytes and transactionOperations because
// we only modify these variables when adding an operation to a transaction. Since this
// transaction is already prepared, we cannot add more operations to it. We will have this
// in the prepare oplog entry.
+ _invalidate(lg);
_resetTransactionState(lg, TransactionState::kNone);
}