diff options
Diffstat (limited to 'src/mongo/db/logical_session_cache_impl.cpp')
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 266 |
1 files changed, 266 insertions, 0 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp new file mode 100644 index 00000000000..c6dd6df3630 --- /dev/null +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -0,0 +1,266 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + +#include "mongo/platform/basic.h" + +#include "mongo/db/logical_session_cache_impl.h" + +#include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/server_parameters.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/util/duration.h" +#include "mongo/util/log.h" +#include "mongo/util/periodic_runner.h" + +namespace mongo { + +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRecordCacheSize, + int, + LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity); + +MONGO_EXPORT_STARTUP_SERVER_PARAMETER( + logicalSessionRefreshMinutes, + int, + LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh.count()); + +constexpr int LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity; +constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh; + +LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiason> service, + std::unique_ptr<SessionsCollection> collection, + Options options) + : _refreshInterval(options.refreshInterval), + _sessionTimeout(options.sessionTimeout), + _service(std::move(service)), + _sessionsColl(std::move(collection)), + _cache(options.capacity) { + PeriodicRunner::PeriodicJob job{[this](Client* client) { _periodicRefresh(client); }, + duration_cast<Milliseconds>(_refreshInterval)}; + _service->scheduleJob(std::move(job)); +} + +LogicalSessionCacheImpl::~LogicalSessionCacheImpl() { + try { + _service->join(); + } catch (...) { + // If we failed to join we might still be running a background thread, + // log but swallow the error since there is no good way to recover. + severe() << "Failed to join background service thread"; + } +} + +Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) { + stdx::unique_lock<stdx::mutex> lk(_cacheMutex); + auto it = _cache.find(lsid); + if (it == _cache.end()) { + return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; + } + + // Update the last use time before returning. + it->second.setLastUse(now()); + return Status::OK(); +} + +Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) { + // Add the new record to our local cache. We will insert it into the sessions collection + // the next time _refresh is called. If there is already a record in the cache for this + // session, we'll just write over it with our newer, more recent one. + _addToCache(record); + return Status::OK(); +} + +Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, + const RefreshSessionsCmdFromClient& cmd) { + // Update the timestamps of all these records in our cache. + auto sessions = makeLogicalSessionIds(cmd.getRefreshSessions(), opCtx); + for (auto& lsid : sessions) { + if (!promote(lsid).isOK()) { + // This is a new record, insert it. + _addToCache(makeLogicalSessionRecord(opCtx, lsid, now())); + } + } + + return Status::OK(); +} + +Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, + const RefreshSessionsCmdFromClusterMember& cmd) { + LogicalSessionRecordSet toRefresh{}; + + // Update the timestamps of all these records in our cache. + auto records = cmd.getRefreshSessionsInternal(); + for (auto& record : records) { + if (!promote(record.getId()).isOK()) { + // This is a new record, insert it. + _addToCache(record); + } + toRefresh.insert(record); + } + + // Write to the sessions collection now. + return _sessionsColl->refreshSessions(opCtx, toRefresh, now()); +} + +void LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) { + if (!promote(lsid).isOK()) { + startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now())).ignore(); + } +} + +Status LogicalSessionCacheImpl::refreshNow(Client* client) { + return _refresh(client); +} + +Date_t LogicalSessionCacheImpl::now() { + return _service->now(); +} + +size_t LogicalSessionCacheImpl::size() { + stdx::lock_guard<stdx::mutex> lock(_cacheMutex); + return _cache.size(); +} + +void LogicalSessionCacheImpl::_periodicRefresh(Client* client) { + auto res = _refresh(client); + if (!res.isOK()) { + log() << "Failed to refresh session cache: " << res; + } + + return; +} + +Status LogicalSessionCacheImpl::_refresh(Client* client) { + LogicalSessionRecordSet activeSessions; + LogicalSessionRecordSet deadSessions; + + auto time = now(); + + // We should avoid situations where we have records in the cache + // that have been expired from the sessions collection. If they haven't been + // used in _sessionTimeout, we should just remove them. + + // Assemble a list of active session records in our cache + std::vector<decltype(_cache)::ListEntry> cacheCopy; + { + stdx::unique_lock<stdx::mutex> lk(_cacheMutex); + cacheCopy.assign(_cache.begin(), _cache.end()); + } + + for (auto& it : cacheCopy) { + auto record = it.second; + if (!_isDead(record, time)) { + activeSessions.insert(record); + } else { + deadSessions.insert(record); + } + } + + // Append any active sessions from the service. We should always have + // cache entries for active sessions. If we don't, then it is a sign that + // the cache needs to be larger, because active session records are being + // evicted. + + // Promote our cached entries for all active service sessions to be recently- + // used, and update their lastUse dates so we don't lose them to eviction. We + // do not need to do this with records from our own cache, which are being used + // regularly. Sessions for long-running queries, however, must be kept alive + // by us here. + auto serviceSessions = _service->getActiveSessions(); + { + stdx::unique_lock<stdx::mutex> lk(_cacheMutex); + for (auto lsid : serviceSessions) { + auto it = _cache.promote(lsid); + if (it != _cache.end()) { + // If we have not found our record, it may have been removed + // by another thread. + it->second.setLastUse(time); + activeSessions.insert(it->second); + } + + // TODO SERVER-29709: Rethink how active sessions interact with refreshes, + // and potentially move this block above the block where we separate + // dead sessions from live sessions, above. + activeSessions.insert(makeLogicalSessionRecord(lsid, time)); + } + } + + // Query into the sessions collection to do the refresh. If any sessions have + // failed to refresh, it means their authoritative records were removed, and + // we should remove such records from our cache as well. + { + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto* const opCtx = [&client, &uniqueCtx] { + if (client->getOperationContext()) { + return client->getOperationContext(); + } + + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); + + auto res = _sessionsColl->refreshSessions(opCtx, std::move(activeSessions), time); + if (!res.isOK()) { + // TODO SERVER-29709: handle network errors here. + return res; + } + } + + // Prune any dead records out of the cache. Dead records are ones that failed to + // refresh, or ones that have expired locally. We don't make an effort to check + // if the locally-expired records still have live authoritative records in the + // sessions collection. We also don't attempt to resurrect our expired records. + // However, we *do* keep records alive if they are active on the service. + { + // TODO SERVER-29709: handle expiration separately from failure to refresh. + } + + return Status::OK(); +} + + +void LogicalSessionCacheImpl::clear() { + // TODO: What should this do? Wasn't implemented before + MONGO_UNREACHABLE; +} + +bool LogicalSessionCacheImpl::_isDead(const LogicalSessionRecord& record, Date_t now) const { + return record.getLastUse() + _sessionTimeout < now; +} + +boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::_addToCache( + LogicalSessionRecord record) { + stdx::unique_lock<stdx::mutex> lk(_cacheMutex); + return _cache.add(record.getId(), std::move(record)); +} + +} // namespace mongo |