diff options
author | Samantha Ritter <samantha.ritter@10gen.com> | 2017-06-02 13:56:38 -0400 |
---|---|---|
committer | samantharitter <samantha.ritter@10gen.com> | 2017-06-05 13:29:41 -0400 |
commit | c2293992d5672c7c7f1c5d94628924ea91a78316 (patch) | |
tree | 906ef7b8e404c1b60ee72056a564384f46fe4cc6 /src/mongo/db/logical_session_cache.cpp | |
parent | 43cc5caeaef92cab569dbe395a5ff8aae36edd0f (diff) | |
download | mongo-c2293992d5672c7c7f1c5d94628924ea91a78316.tar.gz |
SERVER-28300 Implement the logical session cache
Diffstat (limited to 'src/mongo/db/logical_session_cache.cpp')
-rw-r--r-- | src/mongo/db/logical_session_cache.cpp | 226 |
1 files changed, 226 insertions, 0 deletions
diff --git a/src/mongo/db/logical_session_cache.cpp b/src/mongo/db/logical_session_cache.cpp new file mode 100644 index 00000000000..902bd9c9a02 --- /dev/null +++ b/src/mongo/db/logical_session_cache.cpp @@ -0,0 +1,226 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + +#include "mongo/platform/basic.h" + +#include "mongo/db/logical_session_cache.h" + +#include "mongo/util/duration.h" +#include "mongo/util/log.h" +#include "mongo/util/periodic_runner.h" + +namespace mongo { + +constexpr int LogicalSessionCache::kLogicalSessionCacheDefaultCapacity; +constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultTimeout; +constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultRefresh; + +LogicalSessionCache::LogicalSessionCache(std::unique_ptr<ServiceLiason> service, + std::unique_ptr<SessionsCollection> collection, + Options options) + : _refreshInterval(options.refreshInterval), + _sessionTimeout(options.sessionTimeout), + _service(std::move(service)), + _sessionsColl(std::move(collection)), + _cache(options.capacity) { + PeriodicRunner::PeriodicJob job{[this] { _refresh(); }, + duration_cast<Milliseconds>(_refreshInterval)}; + _service->scheduleJob(std::move(job)); +} + +LogicalSessionCache::~LogicalSessionCache() { + try { + _service->join(); + } catch (...) { + // If we failed to join we might still be running a background thread, + // log but swallow the error since there is no good way to recover. + severe() << "Failed to join background service thread"; + } +} + +StatusWith<LogicalSessionRecord::Owner> LogicalSessionCache::getOwner(LogicalSessionId lsid) { + // Search our local cache first + auto owner = getOwnerFromCache(lsid); + if (owner.isOK()) { + return owner; + } + + // Cache miss, must fetch from the sessions collection. + auto res = _sessionsColl->fetchRecord(lsid); + + // If we got a valid record, add it to our cache. + if (res.isOK()) { + auto& record = res.getValue(); + record.setLastUse(_service->now()); + + auto oldRecord = _addToCache(record); + + // If we had a conflicting record for this id, and they aren't the same record, + // it could mean that an interloper called endSession and startSession for the + // same lsid while we were fetching its record from the sessions collection. + // This means our session has been written over, do not allow the caller to use it. + // Note: we could find expired versions of our same record here, but they'll compare equal. + if (oldRecord && *oldRecord != record) { + return {ErrorCodes::NoSuchSession, "no matching session record found"}; + } + + return record.getSessionOwner(); + } + + return res.getStatus(); +} + +StatusWith<LogicalSessionRecord::Owner> LogicalSessionCache::getOwnerFromCache( + LogicalSessionId lsid) { + std::unique_lock<stdx::mutex> lk(_cacheMutex); + auto it = _cache.find(lsid); + if (it == _cache.end()) { + return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; + } + + // Do not use records if they have expired. + auto now = _service->now(); + if (_isDead(it->second, now)) { + return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; + } + + // Update the last use time before returning. + it->second.setLastUse(now); + return it->second.getSessionOwner(); +} + +Status LogicalSessionCache::startSession(LogicalSessionRecord authoritativeRecord) { + // Make sure the timestamp makes sense + auto now = _service->now(); + authoritativeRecord.setLastUse(now); + + // Attempt to insert into the sessions collection first. This collection enforces + // unique session ids, so it will act as concurrency control for us. + auto res = _sessionsColl->insertRecord(authoritativeRecord); + if (!res.isOK()) { + return res; + } + + // Add the new record to our local cache. If we get a conflict here, and the + // conflicting record is not dead and is not equal to our record, an interloper + // may have ended this session and then created a new one with the same id. + // In this case, return a failure. + auto oldRecord = _addToCache(authoritativeRecord); + if (oldRecord) { + if (*oldRecord != authoritativeRecord) { + if (!_isDead(*oldRecord, now)) { + return {ErrorCodes::DuplicateSession, "session with this id already exists"}; + } + } + } + + return Status::OK(); +} + +void LogicalSessionCache::_refresh() { + SessionList activeSessions; + SessionList deadSessions; + + auto now = _service->now(); + + // We should avoid situations where we have records in the cache + // that have been expired from the sessions collection. If they haven't been + // used in _sessionTimeout, we should just remove them. + + // Assemble a list of active session records in our cache + std::vector<decltype(_cache)::ListEntry> cacheCopy; + { + stdx::unique_lock<stdx::mutex> lk(_cacheMutex); + cacheCopy.assign(_cache.begin(), _cache.end()); + } + + for (auto& it : cacheCopy) { + auto record = it.second; + if (!_isDead(record, now)) { + activeSessions.push_back(record.getLsid()); + } else { + deadSessions.push_back(record.getLsid()); + } + } + + // Append any active sessions from the service. We should always have + // cache entries for active sessions. If we don't, then it is a sign that + // the cache needs to be larger, because active session records are being + // evicted. + + // Promote our cached entries for all active service sessions to be recently- + // used, and update their lastUse dates so we don't lose them to eviction. We + // do not need to do this with records from our own cache, which are being used + // regularly. Sessions for long-running queries, however, must be kept alive + // by us here. + auto serviceSessions = _service->getActiveSessions(); + { + stdx::unique_lock<stdx::mutex> lk(_cacheMutex); + for (auto lsid : serviceSessions) { + auto it = _cache.promote(lsid); + if (it != _cache.end()) { + // If we have not found our record, it may have been removed + // by another thread. + it->second.setLastUse(now); + } + } + } + + activeSessions.splice(activeSessions.begin(), serviceSessions); + + // Query into the sessions collection to do the refresh. If any sessions have + // failed to refresh, it means their authoritative records were removed, and + // we should remove such records from our cache as well. + auto failedToRefresh = _sessionsColl->refreshSessions(std::move(activeSessions)); + deadSessions.splice(deadSessions.begin(), failedToRefresh); + + // Prune any dead records out of the cache. Dead records are ones that failed to + // refresh, or ones that have expired locally. We don't make an effort to check + // if the locally-expired records still have live authoritative records in the + // sessions collection. We also don't attempt to resurrect our expired records. + { + stdx::unique_lock<stdx::mutex> lk(_cacheMutex); + for (auto deadId : deadSessions) { + _cache.erase(deadId); + } + } +} + +bool LogicalSessionCache::_isDead(const LogicalSessionRecord& record, Date_t now) const { + return record.getLastUse() + _sessionTimeout < now; +} + +boost::optional<LogicalSessionRecord> LogicalSessionCache::_addToCache( + LogicalSessionRecord record) { + stdx::unique_lock<stdx::mutex> lk(_cacheMutex); + return _cache.add(record.getLsid(), std::move(record)); +} + +} // namespace mongo |