summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSamantha Ritter <samantha.ritter@10gen.com>2017-06-02 13:56:38 -0400
committersamantharitter <samantha.ritter@10gen.com>2017-06-05 13:29:41 -0400
commitc2293992d5672c7c7f1c5d94628924ea91a78316 (patch)
tree906ef7b8e404c1b60ee72056a564384f46fe4cc6
parent43cc5caeaef92cab569dbe395a5ff8aae36edd0f (diff)
downloadmongo-c2293992d5672c7c7f1c5d94628924ea91a78316.tar.gz
SERVER-28300 Implement the logical session cache
-rw-r--r--src/mongo/base/error_codes.err1
-rw-r--r--src/mongo/db/SConscript13
-rw-r--r--src/mongo/db/logical_session_cache.cpp226
-rw-r--r--src/mongo/db/logical_session_cache.h159
-rw-r--r--src/mongo/db/logical_session_id.h3
-rw-r--r--src/mongo/db/logical_session_record.cpp17
-rw-r--r--src/mongo/db/logical_session_record.h29
7 files changed, 440 insertions, 8 deletions
diff --git a/src/mongo/base/error_codes.err b/src/mongo/base/error_codes.err
index 3754363d9e7..3477e4b1639 100644
--- a/src/mongo/base/error_codes.err
+++ b/src/mongo/base/error_codes.err
@@ -210,6 +210,7 @@ error_code("StaleClusterTime", 209)
error_code("CannotVerifyAndSignLogicalTime", 210)
error_code("KeyNotFound", 211)
error_code("IncompatibleRollbackAlgorithm", 212)
+error_code("DuplicateSession", 213)
# Error codes 4000-8999 are reserved.
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 5f644811d01..79aaf13f2c2 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -936,6 +936,19 @@ env.Library(
LIBDEPS=[
],
)
+
+env.Library(
+ target='logical_session_cache',
+ source=[
+ 'logical_session_cache.cpp',
+ ],
+ LIBDEPS=[
+ 'logical_session_record',
+ 'sessions_collection',
+ 'service_liason',
+ ],
+)
+
env.Library(
target='logical_time',
source=[
diff --git a/src/mongo/db/logical_session_cache.cpp b/src/mongo/db/logical_session_cache.cpp
new file mode 100644
index 00000000000..902bd9c9a02
--- /dev/null
+++ b/src/mongo/db/logical_session_cache.cpp
@@ -0,0 +1,226 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/logical_session_cache.h"
+
+#include "mongo/util/duration.h"
+#include "mongo/util/log.h"
+#include "mongo/util/periodic_runner.h"
+
+namespace mongo {
+
+constexpr int LogicalSessionCache::kLogicalSessionCacheDefaultCapacity;
+constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultTimeout;
+constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultRefresh;
+
+LogicalSessionCache::LogicalSessionCache(std::unique_ptr<ServiceLiason> service,
+ std::unique_ptr<SessionsCollection> collection,
+ Options options)
+ : _refreshInterval(options.refreshInterval),
+ _sessionTimeout(options.sessionTimeout),
+ _service(std::move(service)),
+ _sessionsColl(std::move(collection)),
+ _cache(options.capacity) {
+ PeriodicRunner::PeriodicJob job{[this] { _refresh(); },
+ duration_cast<Milliseconds>(_refreshInterval)};
+ _service->scheduleJob(std::move(job));
+}
+
+LogicalSessionCache::~LogicalSessionCache() {
+ try {
+ _service->join();
+ } catch (...) {
+ // If we failed to join we might still be running a background thread,
+ // log but swallow the error since there is no good way to recover.
+ severe() << "Failed to join background service thread";
+ }
+}
+
+StatusWith<LogicalSessionRecord::Owner> LogicalSessionCache::getOwner(LogicalSessionId lsid) {
+ // Search our local cache first
+ auto owner = getOwnerFromCache(lsid);
+ if (owner.isOK()) {
+ return owner;
+ }
+
+ // Cache miss, must fetch from the sessions collection.
+ auto res = _sessionsColl->fetchRecord(lsid);
+
+ // If we got a valid record, add it to our cache.
+ if (res.isOK()) {
+ auto& record = res.getValue();
+ record.setLastUse(_service->now());
+
+ auto oldRecord = _addToCache(record);
+
+ // If we had a conflicting record for this id, and they aren't the same record,
+ // it could mean that an interloper called endSession and startSession for the
+ // same lsid while we were fetching its record from the sessions collection.
+ // This means our session has been written over, do not allow the caller to use it.
+ // Note: we could find expired versions of our same record here, but they'll compare equal.
+ if (oldRecord && *oldRecord != record) {
+ return {ErrorCodes::NoSuchSession, "no matching session record found"};
+ }
+
+ return record.getSessionOwner();
+ }
+
+ return res.getStatus();
+}
+
+StatusWith<LogicalSessionRecord::Owner> LogicalSessionCache::getOwnerFromCache(
+ LogicalSessionId lsid) {
+ std::unique_lock<stdx::mutex> lk(_cacheMutex);
+ auto it = _cache.find(lsid);
+ if (it == _cache.end()) {
+ return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
+ }
+
+ // Do not use records if they have expired.
+ auto now = _service->now();
+ if (_isDead(it->second, now)) {
+ return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
+ }
+
+ // Update the last use time before returning.
+ it->second.setLastUse(now);
+ return it->second.getSessionOwner();
+}
+
+Status LogicalSessionCache::startSession(LogicalSessionRecord authoritativeRecord) {
+ // Make sure the timestamp makes sense
+ auto now = _service->now();
+ authoritativeRecord.setLastUse(now);
+
+ // Attempt to insert into the sessions collection first. This collection enforces
+ // unique session ids, so it will act as concurrency control for us.
+ auto res = _sessionsColl->insertRecord(authoritativeRecord);
+ if (!res.isOK()) {
+ return res;
+ }
+
+ // Add the new record to our local cache. If we get a conflict here, and the
+ // conflicting record is not dead and is not equal to our record, an interloper
+ // may have ended this session and then created a new one with the same id.
+ // In this case, return a failure.
+ auto oldRecord = _addToCache(authoritativeRecord);
+ if (oldRecord) {
+ if (*oldRecord != authoritativeRecord) {
+ if (!_isDead(*oldRecord, now)) {
+ return {ErrorCodes::DuplicateSession, "session with this id already exists"};
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
+void LogicalSessionCache::_refresh() {
+ SessionList activeSessions;
+ SessionList deadSessions;
+
+ auto now = _service->now();
+
+ // We should avoid situations where we have records in the cache
+ // that have been expired from the sessions collection. If they haven't been
+ // used in _sessionTimeout, we should just remove them.
+
+ // Assemble a list of active session records in our cache
+ std::vector<decltype(_cache)::ListEntry> cacheCopy;
+ {
+ stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
+ cacheCopy.assign(_cache.begin(), _cache.end());
+ }
+
+ for (auto& it : cacheCopy) {
+ auto record = it.second;
+ if (!_isDead(record, now)) {
+ activeSessions.push_back(record.getLsid());
+ } else {
+ deadSessions.push_back(record.getLsid());
+ }
+ }
+
+ // Append any active sessions from the service. We should always have
+ // cache entries for active sessions. If we don't, then it is a sign that
+ // the cache needs to be larger, because active session records are being
+ // evicted.
+
+ // Promote our cached entries for all active service sessions to be recently-
+ // used, and update their lastUse dates so we don't lose them to eviction. We
+ // do not need to do this with records from our own cache, which are being used
+ // regularly. Sessions for long-running queries, however, must be kept alive
+ // by us here.
+ auto serviceSessions = _service->getActiveSessions();
+ {
+ stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
+ for (auto lsid : serviceSessions) {
+ auto it = _cache.promote(lsid);
+ if (it != _cache.end()) {
+ // If we have not found our record, it may have been removed
+ // by another thread.
+ it->second.setLastUse(now);
+ }
+ }
+ }
+
+ activeSessions.splice(activeSessions.begin(), serviceSessions);
+
+ // Query into the sessions collection to do the refresh. If any sessions have
+ // failed to refresh, it means their authoritative records were removed, and
+ // we should remove such records from our cache as well.
+ auto failedToRefresh = _sessionsColl->refreshSessions(std::move(activeSessions));
+ deadSessions.splice(deadSessions.begin(), failedToRefresh);
+
+ // Prune any dead records out of the cache. Dead records are ones that failed to
+ // refresh, or ones that have expired locally. We don't make an effort to check
+ // if the locally-expired records still have live authoritative records in the
+ // sessions collection. We also don't attempt to resurrect our expired records.
+ {
+ stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
+ for (auto deadId : deadSessions) {
+ _cache.erase(deadId);
+ }
+ }
+}
+
+bool LogicalSessionCache::_isDead(const LogicalSessionRecord& record, Date_t now) const {
+ return record.getLastUse() + _sessionTimeout < now;
+}
+
+boost::optional<LogicalSessionRecord> LogicalSessionCache::_addToCache(
+ LogicalSessionRecord record) {
+ stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
+ return _cache.add(record.getLsid(), std::move(record));
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
new file mode 100644
index 00000000000..f7aa671bf39
--- /dev/null
+++ b/src/mongo/db/logical_session_cache.h
@@ -0,0 +1,159 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/base/status_with.h"
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/logical_session_record.h"
+#include "mongo/db/service_liason.h"
+#include "mongo/db/sessions_collection.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/stdx/thread.h"
+#include "mongo/util/lru_cache.h"
+
+namespace mongo {
+
+/**
+ * A thread-safe cache structure for logical session records.
+ *
+ * The cache takes ownership of the passed-in ServiceLiason and
+ * SessionsCollection helper types.
+ */
+class LogicalSessionCache {
+public:
+ using SessionList = std::list<LogicalSessionId>;
+
+ static constexpr int kLogicalSessionCacheDefaultCapacity = 10000;
+ static constexpr Minutes kLogicalSessionDefaultTimeout = Minutes(30);
+ static constexpr Minutes kLogicalSessionDefaultRefresh = Minutes(5);
+
+ /**
+ * An Options type to support the LogicalSessionCache.
+ */
+ struct Options {
+ Options(){};
+
+ /**
+ * The number of session records to keep in the cache.
+ */
+ int capacity = kLogicalSessionCacheDefaultCapacity;
+
+ /**
+ * A timeout value to use for sessions in the cache, in minutes.
+ *
+ * By default, this is set to 30 minutes.
+ */
+ Minutes sessionTimeout = kLogicalSessionDefaultTimeout;
+
+ /**
+ * The interval over which the cache will refresh session records.
+ *
+ * By default, this is set to every 5 minutes. If the caller is
+ * setting the sessionTimeout by hand, it is suggested that they
+ * consider also setting the refresh interval accordingly.
+ */
+ Minutes refreshInterval = kLogicalSessionDefaultRefresh;
+ };
+
+ /**
+ * Construct a new session cache.
+ */
+ explicit LogicalSessionCache(std::unique_ptr<ServiceLiason> service,
+ std::unique_ptr<SessionsCollection> collection,
+ Options options = Options{});
+
+ LogicalSessionCache(const LogicalSessionCache&) = delete;
+ LogicalSessionCache operator=(const LogicalSessionCache&) = delete;
+
+ ~LogicalSessionCache();
+
+ /**
+ * Returns the owner for the given session, or return an error if there
+ * is no authoritative record for this session.
+ *
+ * If the cache does not already contain a record for this session, this
+ * method may issue networking operations to obtain the record. Afterwards,
+ * the cache will keep the record for future use.
+ *
+ * This call will promote any record it touches to be the most-recently-used
+ * record in the cache.
+ */
+ StatusWith<LogicalSessionRecord::Owner> getOwner(LogicalSessionId lsid);
+
+ /**
+ * Returns the owner for the given session if we already have its record in the
+ * cache. Do not fetch the record from the network if we do not already have it.
+ *
+ * This call will promote any record it touches to be the most-recently-used
+ * record in the cache.
+ */
+ StatusWith<LogicalSessionRecord::Owner> getOwnerFromCache(LogicalSessionId lsid);
+
+ /**
+ * Inserts a new authoritative session record into the cache. This method will
+ * insert the authoritative record into the sessions collection. This method
+ * should only be used when starting new sessions and should not be used to
+ * insert records for existing sessions.
+ */
+ Status startSession(LogicalSessionRecord authoritativeRecord);
+
+ /**
+ * Removes all local records in this cache. Does not remove the corresponding
+ * authoritative session records from the sessions collection.
+ */
+ void clear();
+
+private:
+ /**
+ * Internal methods to handle scheduling and perform refreshes for active
+ * session records contained within the cache.
+ */
+ void _refresh();
+
+ /**
+ * Returns true if a record has passed its given expiration.
+ */
+ bool _isDead(const LogicalSessionRecord& record, Date_t now) const;
+
+ /**
+ * Takes the lock and inserts the given record into the cache.
+ */
+ boost::optional<LogicalSessionRecord> _addToCache(LogicalSessionRecord record);
+
+ const Minutes _refreshInterval;
+ const Minutes _sessionTimeout;
+
+ std::unique_ptr<ServiceLiason> _service;
+ std::unique_ptr<SessionsCollection> _sessionsColl;
+
+ stdx::mutex _cacheMutex;
+ LRUCache<LogicalSessionId, LogicalSessionRecord, LogicalSessionId::Hash> _cache;
+};
+
+} // namespace mongo
diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h
index 50697ee4e0b..081d4cdec96 100644
--- a/src/mongo/db/logical_session_id.h
+++ b/src/mongo/db/logical_session_id.h
@@ -100,13 +100,12 @@ public:
UUID::Hash _hasher;
};
-
-private:
/**
* This constructor exists for IDL only.
*/
LogicalSessionId();
+private:
/**
* Construct a LogicalSessionId from a UUID.
*/
diff --git a/src/mongo/db/logical_session_record.cpp b/src/mongo/db/logical_session_record.cpp
index 3b3f569f96b..d6904a36f5f 100644
--- a/src/mongo/db/logical_session_record.cpp
+++ b/src/mongo/db/logical_session_record.cpp
@@ -55,8 +55,9 @@ StatusWith<LogicalSessionRecord> LogicalSessionRecord::parse(const BSONObj& bson
LogicalSessionRecord LogicalSessionRecord::makeAuthoritativeRecord(LogicalSessionId id,
UserName user,
- boost::optional<OID> userId) {
- return LogicalSessionRecord(std::move(id), std::move(user), std::move(userId));
+ boost::optional<OID> userId,
+ Date_t now) {
+ return LogicalSessionRecord(std::move(id), std::move(user), std::move(userId), std::move(now));
}
BSONObj LogicalSessionRecord::toBSON() const {
@@ -65,12 +66,20 @@ BSONObj LogicalSessionRecord::toBSON() const {
return builder.obj();
}
+std::string LogicalSessionRecord::toString() const {
+ return str::stream() << "LogicalSessionRecord"
+ << " Id: '" << getLsid() << "'"
+ << " Owner name: '" << getOwner().getUserName() << "'"
+ << " Last-use: " << getLastUse().toString();
+}
+
LogicalSessionRecord::LogicalSessionRecord(LogicalSessionId id,
UserName user,
- boost::optional<OID> userId)
+ boost::optional<OID> userId,
+ Date_t now)
: _owner(std::make_pair(std::move(user), std::move(userId))) {
setLsid(std::move(id));
- setLastUse(Date_t::now());
+ setLastUse(now);
Session_owner owner;
owner.setUserName(_owner.first.getUser());
diff --git a/src/mongo/db/logical_session_record.h b/src/mongo/db/logical_session_record.h
index 7ab60dc94dd..e576cd82eb5 100644
--- a/src/mongo/db/logical_session_record.h
+++ b/src/mongo/db/logical_session_record.h
@@ -71,7 +71,8 @@ public:
*/
static LogicalSessionRecord makeAuthoritativeRecord(LogicalSessionId id,
UserName user,
- boost::optional<OID> userId);
+ boost::optional<OID> userId,
+ Date_t now);
/**
* Return a BSON representation of this session record.
@@ -79,6 +80,19 @@ public:
BSONObj toBSON() const;
/**
+ * Return a string represenation of this session record.
+ */
+ std::string toString() const;
+
+ inline bool operator==(const LogicalSessionRecord& rhs) const {
+ return getLsid() == rhs.getLsid() && getSessionOwner() == rhs.getSessionOwner();
+ }
+
+ inline bool operator!=(const LogicalSessionRecord& rhs) const {
+ return !(*this == rhs);
+ }
+
+ /**
* Return the username and id of the User who owns this session. Only a User
* that matches both the name and id returned by this method should be
* permitted to use this session.
@@ -93,9 +107,20 @@ public:
private:
LogicalSessionRecord() = default;
- LogicalSessionRecord(LogicalSessionId id, UserName user, boost::optional<OID> userId);
+ LogicalSessionRecord(LogicalSessionId id,
+ UserName user,
+ boost::optional<OID> userId,
+ Date_t now);
Owner _owner;
};
+inline std::ostream& operator<<(std::ostream& s, const LogicalSessionRecord& record) {
+ return (s << record.toString());
+}
+
+inline StringBuilder& operator<<(StringBuilder& s, const LogicalSessionRecord& record) {
+ return (s << record.toString());
+}
+
} // namespace mongo