From c2293992d5672c7c7f1c5d94628924ea91a78316 Mon Sep 17 00:00:00 2001 From: Samantha Ritter Date: Fri, 2 Jun 2017 13:56:38 -0400 Subject: SERVER-28300 Implement the logical session cache --- src/mongo/base/error_codes.err | 1 + src/mongo/db/SConscript | 13 ++ src/mongo/db/logical_session_cache.cpp | 226 ++++++++++++++++++++++++++++++++ src/mongo/db/logical_session_cache.h | 159 ++++++++++++++++++++++ src/mongo/db/logical_session_id.h | 3 +- src/mongo/db/logical_session_record.cpp | 17 ++- src/mongo/db/logical_session_record.h | 29 +++- 7 files changed, 440 insertions(+), 8 deletions(-) create mode 100644 src/mongo/db/logical_session_cache.cpp create mode 100644 src/mongo/db/logical_session_cache.h diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err index 3754363d9e7..3477e4b1639 100644 --- a/src/mongo/base/error_codes.err +++ b/src/mongo/base/error_codes.err @@ -210,6 +210,7 @@ error_code("StaleClusterTime", 209) error_code("CannotVerifyAndSignLogicalTime", 210) error_code("KeyNotFound", 211) error_code("IncompatibleRollbackAlgorithm", 212) +error_code("DuplicateSession", 213) # Error codes 4000-8999 are reserved. diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 5f644811d01..79aaf13f2c2 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -936,6 +936,19 @@ env.Library( LIBDEPS=[ ], ) + +env.Library( + target='logical_session_cache', + source=[ + 'logical_session_cache.cpp', + ], + LIBDEPS=[ + 'logical_session_record', + 'sessions_collection', + 'service_liason', + ], +) + env.Library( target='logical_time', source=[ diff --git a/src/mongo/db/logical_session_cache.cpp b/src/mongo/db/logical_session_cache.cpp new file mode 100644 index 00000000000..902bd9c9a02 --- /dev/null +++ b/src/mongo/db/logical_session_cache.cpp @@ -0,0 +1,226 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + +#include "mongo/platform/basic.h" + +#include "mongo/db/logical_session_cache.h" + +#include "mongo/util/duration.h" +#include "mongo/util/log.h" +#include "mongo/util/periodic_runner.h" + +namespace mongo { + +constexpr int LogicalSessionCache::kLogicalSessionCacheDefaultCapacity; +constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultTimeout; +constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultRefresh; + +LogicalSessionCache::LogicalSessionCache(std::unique_ptr service, + std::unique_ptr collection, + Options options) + : _refreshInterval(options.refreshInterval), + _sessionTimeout(options.sessionTimeout), + _service(std::move(service)), + _sessionsColl(std::move(collection)), + _cache(options.capacity) { + PeriodicRunner::PeriodicJob job{[this] { _refresh(); }, + duration_cast(_refreshInterval)}; + _service->scheduleJob(std::move(job)); +} + +LogicalSessionCache::~LogicalSessionCache() { + try { + _service->join(); + } catch (...) { + // If we failed to join we might still be running a background thread, + // log but swallow the error since there is no good way to recover. + severe() << "Failed to join background service thread"; + } +} + +StatusWith LogicalSessionCache::getOwner(LogicalSessionId lsid) { + // Search our local cache first + auto owner = getOwnerFromCache(lsid); + if (owner.isOK()) { + return owner; + } + + // Cache miss, must fetch from the sessions collection. + auto res = _sessionsColl->fetchRecord(lsid); + + // If we got a valid record, add it to our cache. + if (res.isOK()) { + auto& record = res.getValue(); + record.setLastUse(_service->now()); + + auto oldRecord = _addToCache(record); + + // If we had a conflicting record for this id, and they aren't the same record, + // it could mean that an interloper called endSession and startSession for the + // same lsid while we were fetching its record from the sessions collection. + // This means our session has been written over, do not allow the caller to use it. + // Note: we could find expired versions of our same record here, but they'll compare equal. + if (oldRecord && *oldRecord != record) { + return {ErrorCodes::NoSuchSession, "no matching session record found"}; + } + + return record.getSessionOwner(); + } + + return res.getStatus(); +} + +StatusWith LogicalSessionCache::getOwnerFromCache( + LogicalSessionId lsid) { + std::unique_lock lk(_cacheMutex); + auto it = _cache.find(lsid); + if (it == _cache.end()) { + return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; + } + + // Do not use records if they have expired. + auto now = _service->now(); + if (_isDead(it->second, now)) { + return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; + } + + // Update the last use time before returning. + it->second.setLastUse(now); + return it->second.getSessionOwner(); +} + +Status LogicalSessionCache::startSession(LogicalSessionRecord authoritativeRecord) { + // Make sure the timestamp makes sense + auto now = _service->now(); + authoritativeRecord.setLastUse(now); + + // Attempt to insert into the sessions collection first. This collection enforces + // unique session ids, so it will act as concurrency control for us. + auto res = _sessionsColl->insertRecord(authoritativeRecord); + if (!res.isOK()) { + return res; + } + + // Add the new record to our local cache. If we get a conflict here, and the + // conflicting record is not dead and is not equal to our record, an interloper + // may have ended this session and then created a new one with the same id. + // In this case, return a failure. + auto oldRecord = _addToCache(authoritativeRecord); + if (oldRecord) { + if (*oldRecord != authoritativeRecord) { + if (!_isDead(*oldRecord, now)) { + return {ErrorCodes::DuplicateSession, "session with this id already exists"}; + } + } + } + + return Status::OK(); +} + +void LogicalSessionCache::_refresh() { + SessionList activeSessions; + SessionList deadSessions; + + auto now = _service->now(); + + // We should avoid situations where we have records in the cache + // that have been expired from the sessions collection. If they haven't been + // used in _sessionTimeout, we should just remove them. + + // Assemble a list of active session records in our cache + std::vector cacheCopy; + { + stdx::unique_lock lk(_cacheMutex); + cacheCopy.assign(_cache.begin(), _cache.end()); + } + + for (auto& it : cacheCopy) { + auto record = it.second; + if (!_isDead(record, now)) { + activeSessions.push_back(record.getLsid()); + } else { + deadSessions.push_back(record.getLsid()); + } + } + + // Append any active sessions from the service. We should always have + // cache entries for active sessions. If we don't, then it is a sign that + // the cache needs to be larger, because active session records are being + // evicted. + + // Promote our cached entries for all active service sessions to be recently- + // used, and update their lastUse dates so we don't lose them to eviction. We + // do not need to do this with records from our own cache, which are being used + // regularly. Sessions for long-running queries, however, must be kept alive + // by us here. + auto serviceSessions = _service->getActiveSessions(); + { + stdx::unique_lock lk(_cacheMutex); + for (auto lsid : serviceSessions) { + auto it = _cache.promote(lsid); + if (it != _cache.end()) { + // If we have not found our record, it may have been removed + // by another thread. + it->second.setLastUse(now); + } + } + } + + activeSessions.splice(activeSessions.begin(), serviceSessions); + + // Query into the sessions collection to do the refresh. If any sessions have + // failed to refresh, it means their authoritative records were removed, and + // we should remove such records from our cache as well. + auto failedToRefresh = _sessionsColl->refreshSessions(std::move(activeSessions)); + deadSessions.splice(deadSessions.begin(), failedToRefresh); + + // Prune any dead records out of the cache. Dead records are ones that failed to + // refresh, or ones that have expired locally. We don't make an effort to check + // if the locally-expired records still have live authoritative records in the + // sessions collection. We also don't attempt to resurrect our expired records. + { + stdx::unique_lock lk(_cacheMutex); + for (auto deadId : deadSessions) { + _cache.erase(deadId); + } + } +} + +bool LogicalSessionCache::_isDead(const LogicalSessionRecord& record, Date_t now) const { + return record.getLastUse() + _sessionTimeout < now; +} + +boost::optional LogicalSessionCache::_addToCache( + LogicalSessionRecord record) { + stdx::unique_lock lk(_cacheMutex); + return _cache.add(record.getLsid(), std::move(record)); +} + +} // namespace mongo diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h new file mode 100644 index 00000000000..f7aa671bf39 --- /dev/null +++ b/src/mongo/db/logical_session_cache.h @@ -0,0 +1,159 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/base/status_with.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_record.h" +#include "mongo/db/service_liason.h" +#include "mongo/db/sessions_collection.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/stdx/thread.h" +#include "mongo/util/lru_cache.h" + +namespace mongo { + +/** + * A thread-safe cache structure for logical session records. + * + * The cache takes ownership of the passed-in ServiceLiason and + * SessionsCollection helper types. + */ +class LogicalSessionCache { +public: + using SessionList = std::list; + + static constexpr int kLogicalSessionCacheDefaultCapacity = 10000; + static constexpr Minutes kLogicalSessionDefaultTimeout = Minutes(30); + static constexpr Minutes kLogicalSessionDefaultRefresh = Minutes(5); + + /** + * An Options type to support the LogicalSessionCache. + */ + struct Options { + Options(){}; + + /** + * The number of session records to keep in the cache. + */ + int capacity = kLogicalSessionCacheDefaultCapacity; + + /** + * A timeout value to use for sessions in the cache, in minutes. + * + * By default, this is set to 30 minutes. + */ + Minutes sessionTimeout = kLogicalSessionDefaultTimeout; + + /** + * The interval over which the cache will refresh session records. + * + * By default, this is set to every 5 minutes. If the caller is + * setting the sessionTimeout by hand, it is suggested that they + * consider also setting the refresh interval accordingly. + */ + Minutes refreshInterval = kLogicalSessionDefaultRefresh; + }; + + /** + * Construct a new session cache. + */ + explicit LogicalSessionCache(std::unique_ptr service, + std::unique_ptr collection, + Options options = Options{}); + + LogicalSessionCache(const LogicalSessionCache&) = delete; + LogicalSessionCache operator=(const LogicalSessionCache&) = delete; + + ~LogicalSessionCache(); + + /** + * Returns the owner for the given session, or return an error if there + * is no authoritative record for this session. + * + * If the cache does not already contain a record for this session, this + * method may issue networking operations to obtain the record. Afterwards, + * the cache will keep the record for future use. + * + * This call will promote any record it touches to be the most-recently-used + * record in the cache. + */ + StatusWith getOwner(LogicalSessionId lsid); + + /** + * Returns the owner for the given session if we already have its record in the + * cache. Do not fetch the record from the network if we do not already have it. + * + * This call will promote any record it touches to be the most-recently-used + * record in the cache. + */ + StatusWith getOwnerFromCache(LogicalSessionId lsid); + + /** + * Inserts a new authoritative session record into the cache. This method will + * insert the authoritative record into the sessions collection. This method + * should only be used when starting new sessions and should not be used to + * insert records for existing sessions. + */ + Status startSession(LogicalSessionRecord authoritativeRecord); + + /** + * Removes all local records in this cache. Does not remove the corresponding + * authoritative session records from the sessions collection. + */ + void clear(); + +private: + /** + * Internal methods to handle scheduling and perform refreshes for active + * session records contained within the cache. + */ + void _refresh(); + + /** + * Returns true if a record has passed its given expiration. + */ + bool _isDead(const LogicalSessionRecord& record, Date_t now) const; + + /** + * Takes the lock and inserts the given record into the cache. + */ + boost::optional _addToCache(LogicalSessionRecord record); + + const Minutes _refreshInterval; + const Minutes _sessionTimeout; + + std::unique_ptr _service; + std::unique_ptr _sessionsColl; + + stdx::mutex _cacheMutex; + LRUCache _cache; +}; + +} // namespace mongo diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h index 50697ee4e0b..081d4cdec96 100644 --- a/src/mongo/db/logical_session_id.h +++ b/src/mongo/db/logical_session_id.h @@ -100,13 +100,12 @@ public: UUID::Hash _hasher; }; - -private: /** * This constructor exists for IDL only. */ LogicalSessionId(); +private: /** * Construct a LogicalSessionId from a UUID. */ diff --git a/src/mongo/db/logical_session_record.cpp b/src/mongo/db/logical_session_record.cpp index 3b3f569f96b..d6904a36f5f 100644 --- a/src/mongo/db/logical_session_record.cpp +++ b/src/mongo/db/logical_session_record.cpp @@ -55,8 +55,9 @@ StatusWith LogicalSessionRecord::parse(const BSONObj& bson LogicalSessionRecord LogicalSessionRecord::makeAuthoritativeRecord(LogicalSessionId id, UserName user, - boost::optional userId) { - return LogicalSessionRecord(std::move(id), std::move(user), std::move(userId)); + boost::optional userId, + Date_t now) { + return LogicalSessionRecord(std::move(id), std::move(user), std::move(userId), std::move(now)); } BSONObj LogicalSessionRecord::toBSON() const { @@ -65,12 +66,20 @@ BSONObj LogicalSessionRecord::toBSON() const { return builder.obj(); } +std::string LogicalSessionRecord::toString() const { + return str::stream() << "LogicalSessionRecord" + << " Id: '" << getLsid() << "'" + << " Owner name: '" << getOwner().getUserName() << "'" + << " Last-use: " << getLastUse().toString(); +} + LogicalSessionRecord::LogicalSessionRecord(LogicalSessionId id, UserName user, - boost::optional userId) + boost::optional userId, + Date_t now) : _owner(std::make_pair(std::move(user), std::move(userId))) { setLsid(std::move(id)); - setLastUse(Date_t::now()); + setLastUse(now); Session_owner owner; owner.setUserName(_owner.first.getUser()); diff --git a/src/mongo/db/logical_session_record.h b/src/mongo/db/logical_session_record.h index 7ab60dc94dd..e576cd82eb5 100644 --- a/src/mongo/db/logical_session_record.h +++ b/src/mongo/db/logical_session_record.h @@ -71,13 +71,27 @@ public: */ static LogicalSessionRecord makeAuthoritativeRecord(LogicalSessionId id, UserName user, - boost::optional userId); + boost::optional userId, + Date_t now); /** * Return a BSON representation of this session record. */ BSONObj toBSON() const; + /** + * Return a string represenation of this session record. + */ + std::string toString() const; + + inline bool operator==(const LogicalSessionRecord& rhs) const { + return getLsid() == rhs.getLsid() && getSessionOwner() == rhs.getSessionOwner(); + } + + inline bool operator!=(const LogicalSessionRecord& rhs) const { + return !(*this == rhs); + } + /** * Return the username and id of the User who owns this session. Only a User * that matches both the name and id returned by this method should be @@ -93,9 +107,20 @@ public: private: LogicalSessionRecord() = default; - LogicalSessionRecord(LogicalSessionId id, UserName user, boost::optional userId); + LogicalSessionRecord(LogicalSessionId id, + UserName user, + boost::optional userId, + Date_t now); Owner _owner; }; +inline std::ostream& operator<<(std::ostream& s, const LogicalSessionRecord& record) { + return (s << record.toString()); +} + +inline StringBuilder& operator<<(StringBuilder& s, const LogicalSessionRecord& record) { + return (s << record.toString()); +} + } // namespace mongo -- cgit v1.2.1