summaryrefslogtreecommitdiff
path: root/src/mongo/db/logical_session_cache_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/logical_session_cache_impl.cpp')
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp216
1 files changed, 110 insertions, 106 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index ffafcccbd4f..624934c2785 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -41,13 +41,10 @@
#include "mongo/util/duration.h"
#include "mongo/util/log.h"
#include "mongo/util/periodic_runner.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRecordCacheSize,
- int,
- LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity);
-
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(
logicalSessionRefreshMinutes,
int,
@@ -55,7 +52,6 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, false);
-constexpr int LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity;
constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh;
LogicalSessionCacheImpl::LogicalSessionCacheImpl(
@@ -67,8 +63,7 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
_sessionTimeout(options.sessionTimeout),
_service(std::move(service)),
_sessionsColl(std::move(collection)),
- _transactionReaper(std::move(transactionReaper)),
- _cache(options.capacity) {
+ _transactionReaper(std::move(transactionReaper)) {
if (!disableLogicalSessionCacheRefresh) {
_service->scheduleJob(
{[this](Client* client) { _periodicRefresh(client); }, _refreshInterval});
@@ -88,30 +83,27 @@ LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
}
Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- auto it = _cache.find(lsid);
- if (it == _cache.end()) {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ auto it = _activeSessions.find(lsid);
+ if (it == _activeSessions.end()) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
}
- // Update the last use time before returning.
- it->second.setLastUse(now());
return Status::OK();
}
-Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) {
+void LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) {
// Add the new record to our local cache. We will insert it into the sessions collection
// the next time _refresh is called. If there is already a record in the cache for this
// session, we'll just write over it with our newer, more recent one.
_addToCache(record);
- return Status::OK();
}
Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx,
const RefreshSessionsCmdFromClient& cmd) {
// Update the timestamps of all these records in our cache.
auto sessions = makeLogicalSessionIds(cmd.getRefreshSessions(), opCtx);
- for (auto& lsid : sessions) {
+ for (const auto& lsid : sessions) {
if (!promote(lsid).isOK()) {
// This is a new record, insert it.
_addToCache(makeLogicalSessionRecord(opCtx, lsid, now()));
@@ -127,7 +119,7 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx,
// Update the timestamps of all these records in our cache.
auto records = cmd.getRefreshSessionsInternal();
- for (auto& record : records) {
+ for (const auto& record : records) {
if (!promote(record.getId()).isOK()) {
// This is a new record, insert it.
_addToCache(record);
@@ -136,17 +128,22 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx,
}
// Write to the sessions collection now.
- return _sessionsColl->refreshSessions(opCtx, toRefresh, now());
+ return _sessionsColl->refreshSessions(opCtx, toRefresh);
}
void LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) {
if (!promote(lsid).isOK()) {
- startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now())).ignore();
+ startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now()));
}
}
Status LogicalSessionCacheImpl::refreshNow(Client* client) {
- return _refresh(client);
+ try {
+ _refresh(client);
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ return Status::OK();
}
Status LogicalSessionCacheImpl::reapNow(Client* client) {
@@ -159,16 +156,15 @@ Date_t LogicalSessionCacheImpl::now() {
size_t LogicalSessionCacheImpl::size() {
stdx::lock_guard<stdx::mutex> lock(_cacheMutex);
- return _cache.size();
+ return _activeSessions.size();
}
void LogicalSessionCacheImpl::_periodicRefresh(Client* client) {
- auto res = _refresh(client);
- if (!res.isOK()) {
- log() << "Failed to refresh session cache: " << res;
+ try {
+ _refresh(client);
+ } catch (...) {
+ log() << "Failed to refresh session cache: " << exceptionToStatus();
}
-
- return;
}
void LogicalSessionCacheImpl::_periodicReap(Client* client) {
@@ -204,115 +200,123 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return Status::OK();
}
-Status LogicalSessionCacheImpl::_refresh(Client* client) {
- LogicalSessionRecordSet activeSessions;
- LogicalSessionRecordSet deadSessions;
-
- auto time = now();
-
- // We should avoid situations where we have records in the cache
- // that have been expired from the sessions collection. If they haven't been
- // used in _sessionTimeout, we should just remove them.
+void LogicalSessionCacheImpl::_refresh(Client* client) {
+ LogicalSessionIdSet staleSessions;
+ LogicalSessionIdSet explicitlyEndingSessions;
+ LogicalSessionIdMap<LogicalSessionRecord> activeSessions;
+
+ // backSwapper creates a guard that in the case of a exception
+ // replaces the ending or active sessions that swapped out of of LogicalSessionCache,
+ // and merges in any records that had been added since we swapped them
+ // out.
+ auto backSwapper = [this](auto& member, auto& temp) {
+ return MakeGuard([this, &member, &temp] {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ using std::swap;
+ swap(member, temp);
+ for (const auto& it : temp) {
+ member.emplace(it);
+ }
+ });
+ };
- // Assemble a list of active session records in our cache
- std::vector<decltype(_cache)::ListEntry> cacheCopy;
{
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- cacheCopy.assign(_cache.begin(), _cache.end());
+ using std::swap;
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ swap(explicitlyEndingSessions, _endingSessions);
+ swap(activeSessions, _activeSessions);
}
+ auto activeSessionsBackSwapper = backSwapper(_activeSessions, activeSessions);
+ auto explicitlyEndingBackSwaper = backSwapper(_endingSessions, explicitlyEndingSessions);
- for (auto& it : cacheCopy) {
- auto record = it.second;
- if (!_isDead(record, time)) {
- activeSessions.insert(record);
- } else {
- deadSessions.insert(record);
+ // get or make an opCtx
+
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&client, &uniqueCtx] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
}
+
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
+
+ // remove all explicitlyEndingSessions from activeSessions
+ for (const auto& lsid : explicitlyEndingSessions) {
+ activeSessions.erase(lsid);
}
- // Append any active sessions from the service. We should always have
- // cache entries for active sessions. If we don't, then it is a sign that
- // the cache needs to be larger, because active session records are being
- // evicted.
-
- // Promote our cached entries for all active service sessions to be recently-
- // used, and update their lastUse dates so we don't lose them to eviction. We
- // do not need to do this with records from our own cache, which are being used
- // regularly. Sessions for long-running queries, however, must be kept alive
- // by us here.
- auto serviceSessions = _service->getActiveSessions();
- {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- for (auto lsid : serviceSessions) {
- auto it = _cache.promote(lsid);
- if (it != _cache.end()) {
- // If we have not found our record, it may have been removed
- // by another thread.
- it->second.setLastUse(time);
- activeSessions.insert(it->second);
- }
+ // refresh all recently active sessions as well as for sessions attached to running ops
- // TODO SERVER-29709: Rethink how active sessions interact with refreshes,
- // and potentially move this block above the block where we separate
- // dead sessions from live sessions, above.
- activeSessions.insert(makeLogicalSessionRecord(lsid, time));
+ LogicalSessionRecordSet activeSessionRecords{};
+
+ auto runningOpSessions = _service->getActiveOpSessions();
+
+ for (const auto& it : runningOpSessions) {
+ // if a running op is the cause of an upsert, we won't have a user name for the record
+ if (explicitlyEndingSessions.count(it) > 0) {
+ continue;
}
+ LogicalSessionRecord lsr;
+ lsr.setId(it);
+ activeSessionRecords.insert(lsr);
+ }
+ for (const auto& it : activeSessions) {
+ activeSessionRecords.insert(it.second);
}
+ // refresh the active sessions in the sessions collection
+ uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords));
+ activeSessionsBackSwapper.Dismiss();
- // Query into the sessions collection to do the refresh. If any sessions have
- // failed to refresh, it means their authoritative records were removed, and
- // we should remove such records from our cache as well.
- {
- boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
- auto* const opCtx = [&client, &uniqueCtx] {
- if (client->getOperationContext()) {
- return client->getOperationContext();
- }
+ // remove the ending sessions from the sessions collection
+ uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions));
+ explicitlyEndingBackSwaper.Dismiss();
- uniqueCtx.emplace(client->makeOperationContext());
- return uniqueCtx->get();
- }();
+ // Find which running, but not recently active sessions, are expired, and add them
+ // to the list of sessions to kill cursors for
+
+ KillAllSessionsByPatternSet patterns;
+
+ auto openCursorSessions = _service->getOpenCursorSessions();
+ // think about pruning ending and active out of openCursorSessions
+ auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions);
- auto res = _sessionsColl->refreshSessions(opCtx, std::move(activeSessions), time);
- if (!res.isOK()) {
- // TODO SERVER-29709: handle network errors here.
- return res;
+ if (statusAndRemovedSessions.isOK()) {
+ auto removedSessions = statusAndRemovedSessions.getValue();
+ for (const auto& lsid : removedSessions) {
+ patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid));
}
}
- // Prune any dead records out of the cache. Dead records are ones that failed to
- // refresh, or ones that have expired locally. We don't make an effort to check
- // if the locally-expired records still have live authoritative records in the
- // sessions collection. We also don't attempt to resurrect our expired records.
- // However, we *do* keep records alive if they are active on the service.
- {
- // TODO SERVER-29709: handle expiration separately from failure to refresh.
- }
+ // Add all of the explicitly ended sessions to the list of sessions to kill cursors for
- return Status::OK();
+ for (const auto& lsid : explicitlyEndingSessions) {
+ patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid));
+ }
+ SessionKiller::Matcher matcher(std::move(patterns));
+ _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)).ignore();
}
-
void LogicalSessionCacheImpl::clear() {
// TODO: What should this do? Wasn't implemented before
MONGO_UNREACHABLE;
}
-bool LogicalSessionCacheImpl::_isDead(const LogicalSessionRecord& record, Date_t now) const {
- return record.getLastUse() + _sessionTimeout < now;
+void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ _endingSessions.insert(begin(sessions), end(sessions));
}
-boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::_addToCache(
- LogicalSessionRecord record) {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- return _cache.add(record.getId(), std::move(record));
+void LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ _activeSessions.insert(std::make_pair(record.getId(), record));
}
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
std::vector<LogicalSessionId> ret;
- ret.reserve(_cache.size());
- for (const auto& id : _cache) {
+ ret.reserve(_activeSessions.size());
+ for (const auto& id : _activeSessions) {
ret.push_back(id.first);
}
return ret;
@@ -322,7 +326,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
const std::vector<SHA256Block>& userDigests) const {
stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
std::vector<LogicalSessionId> ret;
- for (const auto& it : _cache) {
+ for (const auto& it : _activeSessions) {
if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) !=
userDigests.cend()) {
ret.push_back(it.first);
@@ -334,8 +338,8 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached(
const LogicalSessionId& id) const {
stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
- const auto it = _cache.cfind(id);
- if (it == _cache.cend()) {
+ const auto it = _activeSessions.find(id);
+ if (it == _activeSessions.end()) {
return boost::none;
}
return it->second;