summaryrefslogtreecommitdiff
path: root/src/mongo/db/logical_session_cache_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/logical_session_cache_impl.cpp')
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp110
1 files changed, 52 insertions, 58 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index 24d278c9150..2095595eb5f 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/logical_session_cache_impl.h"
-#include "mongo/db/logical_session_cache_impl_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
@@ -58,13 +57,12 @@ void clearShardingOperationFailedStatus(OperationContext* opCtx) {
} // namespace
-LogicalSessionCacheImpl::LogicalSessionCacheImpl(
- std::unique_ptr<ServiceLiaison> service,
- std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper)
+LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
+ std::shared_ptr<SessionsCollection> collection,
+ ReapSessionsOlderThanFn reapSessionsOlderThanFn)
: _service(std::move(service)),
_sessionsColl(std::move(collection)),
- _transactionReaper(std::move(transactionReaper)) {
+ _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) {
_stats.setLastSessionsCollectionJobTimestamp(now());
_stats.setLastTransactionReaperJobTimestamp(now());
@@ -73,26 +71,22 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
[this](Client* client) { _periodicRefresh(client); },
Milliseconds(logicalSessionRefreshMillis)});
- if (_transactionReaper) {
- _service->scheduleJob({"LogicalSessionCacheReap",
- [this](Client* client) { _periodicReap(client); },
- Milliseconds(logicalSessionRefreshMillis)});
- }
+ _service->scheduleJob({"LogicalSessionCacheReap",
+ [this](Client* client) { _periodicReap(client); },
+ Milliseconds(logicalSessionRefreshMillis)});
}
}
LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
- try {
- _service->join();
- } catch (...) {
- // If we failed to join we might still be running a background thread, log but swallow the
- // error since there is no good way to recover
- severe() << "Failed to join background service thread";
- }
+ joinOnShutDown();
+}
+
+void LogicalSessionCacheImpl::joinOnShutDown() {
+ _service->join();
}
Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto it = _activeSessions.find(lsid);
if (it == _activeSessions.end()) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
@@ -167,7 +161,7 @@ Date_t LogicalSessionCacheImpl::now() {
}
size_t LogicalSessionCacheImpl::size() {
- stdx::lock_guard<stdx::mutex> lock(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
return _activeSessions.size();
}
@@ -189,13 +183,9 @@ void LogicalSessionCacheImpl::_periodicReap(Client* client) {
}
Status LogicalSessionCacheImpl::_reap(Client* client) {
- if (!_transactionReaper) {
- return Status::OK();
- }
-
// Take the lock to update some stats.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the last set of stats for our new run.
_stats.setLastTransactionReaperJobDurationMillis(0);
@@ -206,19 +196,19 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
_stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1);
}
- int numReaped = 0;
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
- try {
- boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
- auto* const opCtx = [&client, &uniqueCtx] {
- if (client->getOperationContext()) {
- return client->getOperationContext();
- }
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
- uniqueCtx.emplace(client->makeOperationContext());
- return uniqueCtx->get();
- }();
+ int numReaped = 0;
+ try {
ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); });
auto existsStatus = _sessionsColl->checkSessionsCollectionExists(opCtx);
@@ -236,20 +226,23 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return Status::OK();
}
- stdx::lock_guard<stdx::mutex> lk(_reaperMutex);
- numReaped = _transactionReaper->reap(opCtx);
- } catch (...) {
+ numReaped =
+ _reapSessionsOlderThanFn(opCtx,
+ *_sessionsColl,
+ opCtx->getServiceContext()->getFastClockSource()->now() -
+ Minutes(gTransactionRecordMinimumLifetimeMinutes));
+ } catch (const DBException& ex) {
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
}
- return exceptionToStatus();
+ return ex.toStatus();
}
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
_stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped);
@@ -261,7 +254,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
void LogicalSessionCacheImpl::_refresh(Client* client) {
// Stats for serverStatus:
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the refresh-related stats with the beginning of our run.
_stats.setLastSessionsCollectionJobDurationMillis(0);
@@ -276,7 +269,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// This will finish timing _refresh for our stats no matter when we return.
const auto timeRefreshJob = makeGuard([this] {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp();
_stats.setLastSessionsCollectionJobDurationMillis(millis.count());
});
@@ -308,7 +301,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
{
using std::swap;
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
swap(explicitlyEndingSessions, _endingSessions);
swap(activeSessions, _activeSessions);
}
@@ -317,7 +310,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// swapped out of LogicalSessionCache, and merges in any records that had been added since we
// swapped them out.
auto backSwap = [this](auto& member, auto& temp) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
using std::swap;
swap(member, temp);
for (const auto& it : temp) {
@@ -333,9 +326,8 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
activeSessions.erase(lsid);
}
- // refresh all recently active sessions as well as for sessions attached to running ops
-
- LogicalSessionRecordSet activeSessionRecords{};
+ // Refresh all recently active sessions as well as for sessions attached to running ops
+ LogicalSessionRecordSet activeSessionRecords;
auto runningOpSessions = _service->getActiveOpSessions();
@@ -354,7 +346,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords));
activeSessionsBackSwapper.dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size());
}
@@ -362,7 +354,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions));
explicitlyEndingBackSwaper.dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size());
}
@@ -375,7 +367,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// Exclude sessions added to _activeSessions from the openCursorSession to avoid race between
// killing cursors on the removed sessions and creating sessions.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
for (const auto& it : _activeSessions) {
auto newSessionIt = openCursorSessions.find(it.first);
@@ -405,33 +397,34 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
SessionKiller::Matcher matcher(std::move(patterns));
auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher));
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobCursorsClosed(killRes.second);
}
}
void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_endingSessions.insert(begin(sessions), end(sessions));
}
LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setActiveSessionsCount(_activeSessions.size());
return _stats;
}
Status LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeSessions.size() >= static_cast<size_t>(maxSessions)) {
return {ErrorCodes::TooManyLogicalSessions, "cannot add session into the cache"};
}
+
_activeSessions.insert(std::make_pair(record.getId(), record));
return Status::OK();
}
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
ret.reserve(_activeSessions.size());
for (const auto& id : _activeSessions) {
@@ -442,7 +435,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
const std::vector<SHA256Block>& userDigests) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
for (const auto& it : _activeSessions) {
if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) !=
@@ -455,11 +448,12 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached(
const LogicalSessionId& id) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto it = _activeSessions.find(id);
if (it == _activeSessions.end()) {
return boost::none;
}
return it->second;
}
+
} // namespace mongo