summaryrefslogtreecommitdiff
path: root/src/mongo/db/logical_session_cache.cpp
diff options
context:
space:
mode:
authorJason Carey <jcarey@argv.me>2017-07-31 18:33:20 -0400
committerJason Carey <jcarey@argv.me>2017-08-22 12:08:57 -0400
commit5fa946faad4ec2817f6b83684811333270f45c78 (patch)
treeaff3a3345fc2d9594854f387a53755f65750a1a9 /src/mongo/db/logical_session_cache.cpp
parent53a831fd5d462fbd5bc050d0b4eaf5875a41400b (diff)
downloadmongo-5fa946faad4ec2817f6b83684811333270f45c78.tar.gz
SERVER-28342 Ensure session bookkeeping happens
Ensure we properly vivify session records on ingress.
Diffstat (limited to 'src/mongo/db/logical_session_cache.cpp')
-rw-r--r--src/mongo/db/logical_session_cache.cpp215
1 files changed, 1 insertions, 214 deletions
diff --git a/src/mongo/db/logical_session_cache.cpp b/src/mongo/db/logical_session_cache.cpp
index c026c2281dd..58760890e1f 100644
--- a/src/mongo/db/logical_session_cache.cpp
+++ b/src/mongo/db/logical_session_cache.cpp
@@ -26,21 +26,12 @@
* it in the license file.
*/
-#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
-
#include "mongo/platform/basic.h"
#include "mongo/db/logical_session_cache.h"
-#include "mongo/db/logical_session_id.h"
-#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
-#include "mongo/platform/atomic_word.h"
-#include "mongo/util/duration.h"
-#include "mongo/util/log.h"
-#include "mongo/util/periodic_runner.h"
namespace mongo {
@@ -51,16 +42,7 @@ const auto getLogicalSessionCache =
const auto getLogicalSessionCacheIsRegistered = ServiceContext::declareDecoration<AtomicBool>();
} // namespace
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRecordCacheSize,
- int,
- LogicalSessionCache::kLogicalSessionCacheDefaultCapacity);
-
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRefreshMinutes,
- int,
- LogicalSessionCache::kLogicalSessionDefaultRefresh.count());
-
-constexpr int LogicalSessionCache::kLogicalSessionCacheDefaultCapacity;
-constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultRefresh;
+LogicalSessionCache::~LogicalSessionCache() = default;
LogicalSessionCache* LogicalSessionCache::get(ServiceContext* service) {
if (getLogicalSessionCacheIsRegistered(service).load()) {
@@ -80,199 +62,4 @@ void LogicalSessionCache::set(ServiceContext* service,
getLogicalSessionCacheIsRegistered(service).store(true);
}
-LogicalSessionCache::LogicalSessionCache(std::unique_ptr<ServiceLiason> service,
- std::unique_ptr<SessionsCollection> collection,
- Options options)
- : _refreshInterval(options.refreshInterval),
- _sessionTimeout(options.sessionTimeout),
- _service(std::move(service)),
- _sessionsColl(std::move(collection)),
- _cache(options.capacity) {
- PeriodicRunner::PeriodicJob job{[this](Client* client) { _periodicRefresh(client); },
- duration_cast<Milliseconds>(_refreshInterval)};
- _service->scheduleJob(std::move(job));
-}
-
-LogicalSessionCache::~LogicalSessionCache() {
- try {
- _service->join();
- } catch (...) {
- // If we failed to join we might still be running a background thread,
- // log but swallow the error since there is no good way to recover.
- severe() << "Failed to join background service thread";
- }
-}
-
-Status LogicalSessionCache::promote(LogicalSessionId lsid) {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- auto it = _cache.find(lsid);
- if (it == _cache.end()) {
- return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
- }
-
- // Update the last use time before returning.
- it->second.setLastUse(now());
- return Status::OK();
-}
-
-Status LogicalSessionCache::startSession(OperationContext* opCtx, LogicalSessionRecord record) {
- // Add the new record to our local cache. We will insert it into the sessions collection
- // the next time _refresh is called. If there is already a record in the cache for this
- // session, we'll just write over it with our newer, more recent one.
- _addToCache(record);
- return Status::OK();
-}
-
-Status LogicalSessionCache::refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClient& cmd) {
- // Update the timestamps of all these records in our cache.
- auto sessions = makeLogicalSessionIds(cmd.getRefreshSessions(), opCtx);
- for (auto& lsid : sessions) {
- if (!promote(lsid).isOK()) {
- // This is a new record, insert it.
- _addToCache(makeLogicalSessionRecord(opCtx, lsid, now()));
- }
- }
-
- return Status::OK();
-}
-
-Status LogicalSessionCache::refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClusterMember& cmd) {
- LogicalSessionRecordSet toRefresh{};
-
- // Update the timestamps of all these records in our cache.
- auto records = cmd.getRefreshSessionsInternal();
- for (auto& record : records) {
- if (!promote(record.getId()).isOK()) {
- // This is a new record, insert it.
- _addToCache(record);
- }
- toRefresh.insert(record);
- }
-
- // Write to the sessions collection now.
- return _sessionsColl->refreshSessions(opCtx, toRefresh, now());
-}
-
-Status LogicalSessionCache::refreshNow(Client* client) {
- return _refresh(client);
-}
-
-Date_t LogicalSessionCache::now() {
- return _service->now();
-}
-
-size_t LogicalSessionCache::size() {
- stdx::lock_guard<stdx::mutex> lock(_cacheMutex);
- return _cache.size();
-}
-
-void LogicalSessionCache::_periodicRefresh(Client* client) {
- auto res = _refresh(client);
- if (!res.isOK()) {
- log() << "Failed to refresh session cache: " << res;
- }
-
- return;
-}
-
-Status LogicalSessionCache::_refresh(Client* client) {
- LogicalSessionRecordSet activeSessions;
- LogicalSessionRecordSet deadSessions;
-
- auto time = now();
-
- // We should avoid situations where we have records in the cache
- // that have been expired from the sessions collection. If they haven't been
- // used in _sessionTimeout, we should just remove them.
-
- // Assemble a list of active session records in our cache
- std::vector<decltype(_cache)::ListEntry> cacheCopy;
- {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- cacheCopy.assign(_cache.begin(), _cache.end());
- }
-
- for (auto& it : cacheCopy) {
- auto record = it.second;
- if (!_isDead(record, time)) {
- activeSessions.insert(record);
- } else {
- deadSessions.insert(record);
- }
- }
-
- // Append any active sessions from the service. We should always have
- // cache entries for active sessions. If we don't, then it is a sign that
- // the cache needs to be larger, because active session records are being
- // evicted.
-
- // Promote our cached entries for all active service sessions to be recently-
- // used, and update their lastUse dates so we don't lose them to eviction. We
- // do not need to do this with records from our own cache, which are being used
- // regularly. Sessions for long-running queries, however, must be kept alive
- // by us here.
- auto serviceSessions = _service->getActiveSessions();
- {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- for (auto lsid : serviceSessions) {
- auto it = _cache.promote(lsid);
- if (it != _cache.end()) {
- // If we have not found our record, it may have been removed
- // by another thread.
- it->second.setLastUse(time);
- activeSessions.insert(it->second);
- }
-
- // TODO SERVER-29709: Rethink how active sessions interact with refreshes,
- // and potentially move this block above the block where we separate
- // dead sessions from live sessions, above.
- activeSessions.insert(makeLogicalSessionRecord(lsid, time));
- }
- }
-
- // Query into the sessions collection to do the refresh. If any sessions have
- // failed to refresh, it means their authoritative records were removed, and
- // we should remove such records from our cache as well.
- {
- boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
- auto* const opCtx = [&client, &uniqueCtx] {
- if (client->getOperationContext()) {
- return client->getOperationContext();
- }
-
- uniqueCtx.emplace(client->makeOperationContext());
- return uniqueCtx->get();
- }();
-
- auto res = _sessionsColl->refreshSessions(opCtx, std::move(activeSessions), time);
- if (!res.isOK()) {
- // TODO SERVER-29709: handle network errors here.
- return res;
- }
- }
-
- // Prune any dead records out of the cache. Dead records are ones that failed to
- // refresh, or ones that have expired locally. We don't make an effort to check
- // if the locally-expired records still have live authoritative records in the
- // sessions collection. We also don't attempt to resurrect our expired records.
- // However, we *do* keep records alive if they are active on the service.
- {
- // TODO SERVER-29709: handle expiration separately from failure to refresh.
- }
-
- return Status::OK();
-}
-
-bool LogicalSessionCache::_isDead(const LogicalSessionRecord& record, Date_t now) const {
- return record.getLastUse() + _sessionTimeout < now;
-}
-
-boost::optional<LogicalSessionRecord> LogicalSessionCache::_addToCache(
- LogicalSessionRecord record) {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- return _cache.add(record.getId(), std::move(record));
-}
-
} // namespace mongo