diff options
Diffstat (limited to 'src/mongo/db/logical_session_cache_impl.cpp')
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 216 |
1 files changed, 110 insertions, 106 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index ffafcccbd4f..624934c2785 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -41,13 +41,10 @@ #include "mongo/util/duration.h" #include "mongo/util/log.h" #include "mongo/util/periodic_runner.h" +#include "mongo/util/scopeguard.h" namespace mongo { -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRecordCacheSize, - int, - LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity); - MONGO_EXPORT_STARTUP_SERVER_PARAMETER( logicalSessionRefreshMinutes, int, @@ -55,7 +52,6 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER( MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, false); -constexpr int LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity; constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh; LogicalSessionCacheImpl::LogicalSessionCacheImpl( @@ -67,8 +63,7 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl( _sessionTimeout(options.sessionTimeout), _service(std::move(service)), _sessionsColl(std::move(collection)), - _transactionReaper(std::move(transactionReaper)), - _cache(options.capacity) { + _transactionReaper(std::move(transactionReaper)) { if (!disableLogicalSessionCacheRefresh) { _service->scheduleJob( {[this](Client* client) { _periodicRefresh(client); }, _refreshInterval}); @@ -88,30 +83,27 @@ LogicalSessionCacheImpl::~LogicalSessionCacheImpl() { } Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) { - stdx::unique_lock<stdx::mutex> lk(_cacheMutex); - auto it = _cache.find(lsid); - if (it == _cache.end()) { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + auto it = _activeSessions.find(lsid); + if (it == _activeSessions.end()) { return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; } - // Update the last use time before returning. - it->second.setLastUse(now()); return Status::OK(); } -Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) { +void LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) { // Add the new record to our local cache. We will insert it into the sessions collection // the next time _refresh is called. If there is already a record in the cache for this // session, we'll just write over it with our newer, more recent one. _addToCache(record); - return Status::OK(); } Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, const RefreshSessionsCmdFromClient& cmd) { // Update the timestamps of all these records in our cache. auto sessions = makeLogicalSessionIds(cmd.getRefreshSessions(), opCtx); - for (auto& lsid : sessions) { + for (const auto& lsid : sessions) { if (!promote(lsid).isOK()) { // This is a new record, insert it. _addToCache(makeLogicalSessionRecord(opCtx, lsid, now())); @@ -127,7 +119,7 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, // Update the timestamps of all these records in our cache. auto records = cmd.getRefreshSessionsInternal(); - for (auto& record : records) { + for (const auto& record : records) { if (!promote(record.getId()).isOK()) { // This is a new record, insert it. _addToCache(record); @@ -136,17 +128,22 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, } // Write to the sessions collection now. - return _sessionsColl->refreshSessions(opCtx, toRefresh, now()); + return _sessionsColl->refreshSessions(opCtx, toRefresh); } void LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) { if (!promote(lsid).isOK()) { - startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now())).ignore(); + startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now())); } } Status LogicalSessionCacheImpl::refreshNow(Client* client) { - return _refresh(client); + try { + _refresh(client); + } catch (...) { + return exceptionToStatus(); + } + return Status::OK(); } Status LogicalSessionCacheImpl::reapNow(Client* client) { @@ -159,16 +156,15 @@ Date_t LogicalSessionCacheImpl::now() { size_t LogicalSessionCacheImpl::size() { stdx::lock_guard<stdx::mutex> lock(_cacheMutex); - return _cache.size(); + return _activeSessions.size(); } void LogicalSessionCacheImpl::_periodicRefresh(Client* client) { - auto res = _refresh(client); - if (!res.isOK()) { - log() << "Failed to refresh session cache: " << res; + try { + _refresh(client); + } catch (...) { + log() << "Failed to refresh session cache: " << exceptionToStatus(); } - - return; } void LogicalSessionCacheImpl::_periodicReap(Client* client) { @@ -204,115 +200,123 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { return Status::OK(); } -Status LogicalSessionCacheImpl::_refresh(Client* client) { - LogicalSessionRecordSet activeSessions; - LogicalSessionRecordSet deadSessions; - - auto time = now(); - - // We should avoid situations where we have records in the cache - // that have been expired from the sessions collection. If they haven't been - // used in _sessionTimeout, we should just remove them. +void LogicalSessionCacheImpl::_refresh(Client* client) { + LogicalSessionIdSet staleSessions; + LogicalSessionIdSet explicitlyEndingSessions; + LogicalSessionIdMap<LogicalSessionRecord> activeSessions; + + // backSwapper creates a guard that in the case of a exception + // replaces the ending or active sessions that swapped out of of LogicalSessionCache, + // and merges in any records that had been added since we swapped them + // out. + auto backSwapper = [this](auto& member, auto& temp) { + return MakeGuard([this, &member, &temp] { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + using std::swap; + swap(member, temp); + for (const auto& it : temp) { + member.emplace(it); + } + }); + }; - // Assemble a list of active session records in our cache - std::vector<decltype(_cache)::ListEntry> cacheCopy; { - stdx::unique_lock<stdx::mutex> lk(_cacheMutex); - cacheCopy.assign(_cache.begin(), _cache.end()); + using std::swap; + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + swap(explicitlyEndingSessions, _endingSessions); + swap(activeSessions, _activeSessions); } + auto activeSessionsBackSwapper = backSwapper(_activeSessions, activeSessions); + auto explicitlyEndingBackSwaper = backSwapper(_endingSessions, explicitlyEndingSessions); - for (auto& it : cacheCopy) { - auto record = it.second; - if (!_isDead(record, time)) { - activeSessions.insert(record); - } else { - deadSessions.insert(record); + // get or make an opCtx + + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto* const opCtx = [&client, &uniqueCtx] { + if (client->getOperationContext()) { + return client->getOperationContext(); } + + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); + + // remove all explicitlyEndingSessions from activeSessions + for (const auto& lsid : explicitlyEndingSessions) { + activeSessions.erase(lsid); } - // Append any active sessions from the service. We should always have - // cache entries for active sessions. If we don't, then it is a sign that - // the cache needs to be larger, because active session records are being - // evicted. - - // Promote our cached entries for all active service sessions to be recently- - // used, and update their lastUse dates so we don't lose them to eviction. We - // do not need to do this with records from our own cache, which are being used - // regularly. Sessions for long-running queries, however, must be kept alive - // by us here. - auto serviceSessions = _service->getActiveSessions(); - { - stdx::unique_lock<stdx::mutex> lk(_cacheMutex); - for (auto lsid : serviceSessions) { - auto it = _cache.promote(lsid); - if (it != _cache.end()) { - // If we have not found our record, it may have been removed - // by another thread. - it->second.setLastUse(time); - activeSessions.insert(it->second); - } + // refresh all recently active sessions as well as for sessions attached to running ops - // TODO SERVER-29709: Rethink how active sessions interact with refreshes, - // and potentially move this block above the block where we separate - // dead sessions from live sessions, above. - activeSessions.insert(makeLogicalSessionRecord(lsid, time)); + LogicalSessionRecordSet activeSessionRecords{}; + + auto runningOpSessions = _service->getActiveOpSessions(); + + for (const auto& it : runningOpSessions) { + // if a running op is the cause of an upsert, we won't have a user name for the record + if (explicitlyEndingSessions.count(it) > 0) { + continue; } + LogicalSessionRecord lsr; + lsr.setId(it); + activeSessionRecords.insert(lsr); + } + for (const auto& it : activeSessions) { + activeSessionRecords.insert(it.second); } + // refresh the active sessions in the sessions collection + uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords)); + activeSessionsBackSwapper.Dismiss(); - // Query into the sessions collection to do the refresh. If any sessions have - // failed to refresh, it means their authoritative records were removed, and - // we should remove such records from our cache as well. - { - boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; - auto* const opCtx = [&client, &uniqueCtx] { - if (client->getOperationContext()) { - return client->getOperationContext(); - } + // remove the ending sessions from the sessions collection + uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions)); + explicitlyEndingBackSwaper.Dismiss(); - uniqueCtx.emplace(client->makeOperationContext()); - return uniqueCtx->get(); - }(); + // Find which running, but not recently active sessions, are expired, and add them + // to the list of sessions to kill cursors for + + KillAllSessionsByPatternSet patterns; + + auto openCursorSessions = _service->getOpenCursorSessions(); + // think about pruning ending and active out of openCursorSessions + auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions); - auto res = _sessionsColl->refreshSessions(opCtx, std::move(activeSessions), time); - if (!res.isOK()) { - // TODO SERVER-29709: handle network errors here. - return res; + if (statusAndRemovedSessions.isOK()) { + auto removedSessions = statusAndRemovedSessions.getValue(); + for (const auto& lsid : removedSessions) { + patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); } } - // Prune any dead records out of the cache. Dead records are ones that failed to - // refresh, or ones that have expired locally. We don't make an effort to check - // if the locally-expired records still have live authoritative records in the - // sessions collection. We also don't attempt to resurrect our expired records. - // However, we *do* keep records alive if they are active on the service. - { - // TODO SERVER-29709: handle expiration separately from failure to refresh. - } + // Add all of the explicitly ended sessions to the list of sessions to kill cursors for - return Status::OK(); + for (const auto& lsid : explicitlyEndingSessions) { + patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); + } + SessionKiller::Matcher matcher(std::move(patterns)); + _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)).ignore(); } - void LogicalSessionCacheImpl::clear() { // TODO: What should this do? Wasn't implemented before MONGO_UNREACHABLE; } -bool LogicalSessionCacheImpl::_isDead(const LogicalSessionRecord& record, Date_t now) const { - return record.getLastUse() + _sessionTimeout < now; +void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + _endingSessions.insert(begin(sessions), end(sessions)); } -boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::_addToCache( - LogicalSessionRecord record) { - stdx::unique_lock<stdx::mutex> lk(_cacheMutex); - return _cache.add(record.getId(), std::move(record)); +void LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + _activeSessions.insert(std::make_pair(record.getId(), record)); } std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const { stdx::lock_guard<stdx::mutex> lk(_cacheMutex); std::vector<LogicalSessionId> ret; - ret.reserve(_cache.size()); - for (const auto& id : _cache) { + ret.reserve(_activeSessions.size()); + for (const auto& id : _activeSessions) { ret.push_back(id.first); } return ret; @@ -322,7 +326,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( const std::vector<SHA256Block>& userDigests) const { stdx::lock_guard<stdx::mutex> lk(_cacheMutex); std::vector<LogicalSessionId> ret; - for (const auto& it : _cache) { + for (const auto& it : _activeSessions) { if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) != userDigests.cend()) { ret.push_back(it.first); @@ -334,8 +338,8 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached( const LogicalSessionId& id) const { stdx::lock_guard<stdx::mutex> lk(_cacheMutex); - const auto it = _cache.cfind(id); - if (it == _cache.cend()) { + const auto it = _activeSessions.find(id); + if (it == _activeSessions.end()) { return boost::none; } return it->second; |