/**
* Copyright (C) 2017 MongoDB Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
#include "mongo/platform/basic.h"
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
#include "mongo/util/duration.h"
#include "mongo/util/log.h"
#include "mongo/util/periodic_runner.h"
namespace mongo {
namespace {
const auto getLogicalSessionCache =
ServiceContext::declareDecoration>();
} // namespace
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRecordCacheSize,
int,
LogicalSessionCache::kLogicalSessionCacheDefaultCapacity);
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRefreshMinutes,
int,
LogicalSessionCache::kLogicalSessionDefaultRefresh.count());
constexpr int LogicalSessionCache::kLogicalSessionCacheDefaultCapacity;
constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultRefresh;
LogicalSessionCache* LogicalSessionCache::get(ServiceContext* service) {
return getLogicalSessionCache(service).get();
}
LogicalSessionCache* LogicalSessionCache::get(OperationContext* ctx) {
return get(ctx->getClient()->getServiceContext());
}
void LogicalSessionCache::set(ServiceContext* service,
std::unique_ptr sessionCache) {
auto& cache = getLogicalSessionCache(service);
cache = std::move(sessionCache);
}
LogicalSessionCache::LogicalSessionCache(std::unique_ptr service,
std::unique_ptr collection,
Options options)
: _refreshInterval(options.refreshInterval),
_sessionTimeout(options.sessionTimeout),
_service(std::move(service)),
_sessionsColl(std::move(collection)),
_cache(options.capacity) {
PeriodicRunner::PeriodicJob job{[this](Client* client) { _refresh(client); },
duration_cast(_refreshInterval)};
_service->scheduleJob(std::move(job));
}
LogicalSessionCache::~LogicalSessionCache() {
try {
_service->join();
} catch (...) {
// If we failed to join we might still be running a background thread,
// log but swallow the error since there is no good way to recover.
severe() << "Failed to join background service thread";
}
}
// TODO: fetch should attempt to update user info, if it is not in the found record.
Status LogicalSessionCache::fetchAndPromote(OperationContext* opCtx, const LogicalSessionId& lsid) {
// Search our local cache first
auto promoteRes = promote(lsid);
if (promoteRes.isOK()) {
return promoteRes;
}
// Cache miss, must fetch from the sessions collection.
auto res = _sessionsColl->fetchRecord(opCtx, lsid);
// If we got a valid record, add it to our cache.
if (res.isOK()) {
auto& record = res.getValue();
record.setLastUse(now());
// Any duplicate records here are actually the same record with different
// lastUse times, ignore them.
auto oldRecord = _addToCache(record);
return Status::OK();
}
// If we could not get a valid record, return the error.
return res.getStatus();
}
Status LogicalSessionCache::promote(LogicalSessionId lsid) {
stdx::unique_lock lk(_cacheMutex);
auto it = _cache.find(lsid);
if (it == _cache.end()) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
}
// Do not use records if they have expired.
auto time = now();
if (_isDead(it->second, time)) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
}
// Update the last use time before returning.
it->second.setLastUse(time);
return Status::OK();
}
Status LogicalSessionCache::startSession(OperationContext* opCtx, LogicalSessionRecord record) {
// Add the new record to our local cache. We will insert it into the sessions collection
// the next time _refresh is called.
// If we get a conflict here, then an interloper may have ended this session
// and then created a new one with the same id. In this case, return a failure.
auto oldRecord = _addToCache(record);
if (oldRecord) {
if (*oldRecord != record) {
if (!_isDead(*oldRecord, now())) {
return {ErrorCodes::DuplicateSession, "session with this id already exists"};
}
}
}
return Status::OK();
}
void LogicalSessionCache::refreshNow(Client* client) {
return _refresh(client);
}
Date_t LogicalSessionCache::now() {
return _service->now();
}
void LogicalSessionCache::_refresh(Client* client) {
LogicalSessionRecordSet activeSessions;
LogicalSessionRecordSet deadSessions;
auto time = now();
// We should avoid situations where we have records in the cache
// that have been expired from the sessions collection. If they haven't been
// used in _sessionTimeout, we should just remove them.
// Assemble a list of active session records in our cache
std::vector cacheCopy;
{
stdx::unique_lock lk(_cacheMutex);
cacheCopy.assign(_cache.begin(), _cache.end());
}
for (auto& it : cacheCopy) {
auto record = it.second;
if (!_isDead(record, time)) {
activeSessions.insert(record);
} else {
deadSessions.insert(record);
}
}
// Append any active sessions from the service. We should always have
// cache entries for active sessions. If we don't, then it is a sign that
// the cache needs to be larger, because active session records are being
// evicted.
// Promote our cached entries for all active service sessions to be recently-
// used, and update their lastUse dates so we don't lose them to eviction. We
// do not need to do this with records from our own cache, which are being used
// regularly. Sessions for long-running queries, however, must be kept alive
// by us here.
auto serviceSessions = _service->getActiveSessions();
{
stdx::unique_lock lk(_cacheMutex);
for (auto lsid : serviceSessions) {
auto it = _cache.promote(lsid);
if (it != _cache.end()) {
// If we have not found our record, it may have been removed
// by another thread.
it->second.setLastUse(time);
activeSessions.insert(it->second);
}
// TODO SERVER-29709: Rethink how active sessions interact with refreshes,
// and potentially move this block above the block where we separate
// dead sessions from live sessions, above.
activeSessions.insert(makeLogicalSessionRecord(lsid, time));
}
}
// Query into the sessions collection to do the refresh. If any sessions have
// failed to refresh, it means their authoritative records were removed, and
// we should remove such records from our cache as well.
{
boost::optional uniqueCtx;
auto* const opCtx = [&client, &uniqueCtx] {
if (client->getOperationContext()) {
return client->getOperationContext();
}
uniqueCtx.emplace(client->makeOperationContext());
return uniqueCtx->get();
}();
auto res = _sessionsColl->refreshSessions(opCtx, std::move(activeSessions), time);
if (!res.isOK()) {
// TODO SERVER-29709: handle network errors here.
return;
}
}
// Prune any dead records out of the cache. Dead records are ones that failed to
// refresh, or ones that have expired locally. We don't make an effort to check
// if the locally-expired records still have live authoritative records in the
// sessions collection. We also don't attempt to resurrect our expired records.
// However, we *do* keep records alive if they are active on the service.
{
// TODO SERVER-29709: handle expiration separately from failure to refresh.
}
}
bool LogicalSessionCache::_isDead(const LogicalSessionRecord& record, Date_t now) const {
return record.getLastUse() + _sessionTimeout < now;
}
boost::optional LogicalSessionCache::_addToCache(
LogicalSessionRecord record) {
stdx::unique_lock lk(_cacheMutex);
return _cache.add(record.getId(), std::move(record));
}
} // namespace mongo