/** * Copyright (C) 2017 MongoDB Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects * for all of the code used other than as permitted herein. If you modify * file(s) with this exception, you may extend this exception to your * version of the file(s), but you are not obligated to do so. If you do not * wish to do so, delete this exception statement from your version. If you * delete this exception statement from all source files in the program, * then also delete it in the license file. */ #define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl #include "mongo/platform/basic.h" #include "mongo/db/logical_session_cache_impl.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" #include "mongo/platform/atomic_word.h" #include "mongo/util/duration.h" #include "mongo/util/log.h" #include "mongo/util/periodic_runner.h" #include "mongo/util/scopeguard.h" namespace mongo { MONGO_EXPORT_STARTUP_SERVER_PARAMETER( logicalSessionRefreshMinutes, int, LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh.count()); MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, false); constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh; LogicalSessionCacheImpl::LogicalSessionCacheImpl( std::unique_ptr service, std::shared_ptr collection, std::shared_ptr transactionReaper, Options options) : _refreshInterval(options.refreshInterval), _sessionTimeout(options.sessionTimeout), _service(std::move(service)), _sessionsColl(std::move(collection)), _transactionReaper(std::move(transactionReaper)) { if (!disableLogicalSessionCacheRefresh) { _service->scheduleJob({"LogicalSessionCacheRefresh", [this](Client* client) { _periodicRefresh(client); }, _refreshInterval}); if (_transactionReaper) { _service->scheduleJob({"LogicalSessionCacheReap", [this](Client* client) { _periodicReap(client); }, _refreshInterval}); } } _stats.setLastSessionsCollectionJobTimestamp(now()); _stats.setLastTransactionReaperJobTimestamp(now()); } LogicalSessionCacheImpl::~LogicalSessionCacheImpl() { try { _service->join(); } catch (...) { // If we failed to join we might still be running a background thread, // log but swallow the error since there is no good way to recover. severe() << "Failed to join background service thread"; } } Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) { stdx::lock_guard lk(_cacheMutex); auto it = _activeSessions.find(lsid); if (it == _activeSessions.end()) { return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; } return Status::OK(); } void LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) { // Add the new record to our local cache. We will insert it into the sessions collection // the next time _refresh is called. If there is already a record in the cache for this // session, we'll just write over it with our newer, more recent one. _addToCache(record); } Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, const RefreshSessionsCmdFromClient& cmd) { // Update the timestamps of all these records in our cache. auto sessions = makeLogicalSessionIds(cmd.getRefreshSessions(), opCtx); for (const auto& lsid : sessions) { if (!promote(lsid).isOK()) { // This is a new record, insert it. _addToCache(makeLogicalSessionRecord(opCtx, lsid, now())); } } return Status::OK(); } Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, const RefreshSessionsCmdFromClusterMember& cmd) { // Update the timestamps of all these records in our cache. auto records = cmd.getRefreshSessionsInternal(); for (const auto& record : records) { if (!promote(record.getId()).isOK()) { // This is a new record, insert it. _addToCache(record); } } return Status::OK(); } void LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) { if (!promote(lsid).isOK()) { startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now())); } } Status LogicalSessionCacheImpl::refreshNow(Client* client) { try { _refresh(client); } catch (...) { return exceptionToStatus(); } return Status::OK(); } Status LogicalSessionCacheImpl::reapNow(Client* client) { return _reap(client); } Date_t LogicalSessionCacheImpl::now() { return _service->now(); } size_t LogicalSessionCacheImpl::size() { stdx::lock_guard lock(_cacheMutex); return _activeSessions.size(); } void LogicalSessionCacheImpl::_periodicRefresh(Client* client) { try { _refresh(client); } catch (...) { log() << "Failed to refresh session cache: " << exceptionToStatus(); } } void LogicalSessionCacheImpl::_periodicReap(Client* client) { auto res = _reap(client); if (!res.isOK()) { log() << "Failed to reap transaction table: " << res; } return; } Status LogicalSessionCacheImpl::_reap(Client* client) { if (!_transactionReaper) { return Status::OK(); } // Take the lock to update some stats. { stdx::lock_guard lk(_cacheMutex); // Clear the last set of stats for our new run. _stats.setLastTransactionReaperJobDurationMillis(0); _stats.setLastTransactionReaperJobEntriesCleanedUp(0); // Start the new run. _stats.setLastTransactionReaperJobTimestamp(now()); _stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1); } int numReaped = 0; try { boost::optional uniqueCtx; auto* const opCtx = [&client, &uniqueCtx] { if (client->getOperationContext()) { return client->getOperationContext(); } uniqueCtx.emplace(client->makeOperationContext()); return uniqueCtx->get(); }(); auto res = _sessionsColl->setupSessionsCollection(opCtx); if (!res.isOK()) { log() << "Sessions collection is not set up; " << "waiting until next sessions reap interval: " << res.reason(); return Status::OK(); } stdx::lock_guard lk(_reaperMutex); numReaped = _transactionReaper->reap(opCtx); } catch (...) { { stdx::lock_guard lk(_cacheMutex); auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); _stats.setLastTransactionReaperJobDurationMillis(millis.count()); } return exceptionToStatus(); } { stdx::lock_guard lk(_cacheMutex); auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); _stats.setLastTransactionReaperJobDurationMillis(millis.count()); _stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped); } return Status::OK(); } void LogicalSessionCacheImpl::_refresh(Client* client) { // Stats for serverStatus: { stdx::lock_guard lk(_cacheMutex); // Clear the refresh-related stats with the beginning of our run. _stats.setLastSessionsCollectionJobDurationMillis(0); _stats.setLastSessionsCollectionJobEntriesRefreshed(0); _stats.setLastSessionsCollectionJobEntriesEnded(0); _stats.setLastSessionsCollectionJobCursorsClosed(0); // Start the new run. _stats.setLastSessionsCollectionJobTimestamp(now()); _stats.setSessionsCollectionJobCount(_stats.getSessionsCollectionJobCount() + 1); } // This will finish timing _refresh for our stats no matter when we return. const auto timeRefreshJob = MakeGuard([this] { stdx::lock_guard lk(_cacheMutex); auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp(); _stats.setLastSessionsCollectionJobDurationMillis(millis.count()); }); // get or make an opCtx boost::optional uniqueCtx; auto* const opCtx = [&client, &uniqueCtx] { if (client->getOperationContext()) { return client->getOperationContext(); } uniqueCtx.emplace(client->makeOperationContext()); return uniqueCtx->get(); }(); auto res = _sessionsColl->setupSessionsCollection(opCtx); if (!res.isOK()) { log() << "Sessions collection is not set up; " << "waiting until next sessions refresh interval: " << res.reason(); return; } LogicalSessionIdSet staleSessions; LogicalSessionIdSet explicitlyEndingSessions; LogicalSessionIdMap activeSessions; // backSwapper creates a guard that in the case of a exception // replaces the ending or active sessions that swapped out of of LogicalSessionCache, // and merges in any records that had been added since we swapped them // out. auto backSwapper = [this](auto& member, auto& temp) { return MakeGuard([this, &member, &temp] { stdx::lock_guard lk(_cacheMutex); using std::swap; swap(member, temp); for (const auto& it : temp) { member.emplace(it); } }); }; { using std::swap; stdx::lock_guard lk(_cacheMutex); swap(explicitlyEndingSessions, _endingSessions); swap(activeSessions, _activeSessions); } auto activeSessionsBackSwapper = backSwapper(_activeSessions, activeSessions); auto explicitlyEndingBackSwaper = backSwapper(_endingSessions, explicitlyEndingSessions); // remove all explicitlyEndingSessions from activeSessions for (const auto& lsid : explicitlyEndingSessions) { activeSessions.erase(lsid); } // refresh all recently active sessions as well as for sessions attached to running ops LogicalSessionRecordSet activeSessionRecords{}; auto runningOpSessions = _service->getActiveOpSessions(); for (const auto& it : runningOpSessions) { // if a running op is the cause of an upsert, we won't have a user name for the record if (explicitlyEndingSessions.count(it) > 0) { continue; } activeSessionRecords.insert(makeLogicalSessionRecord(it, now())); } for (const auto& it : activeSessions) { activeSessionRecords.insert(it.second); } // Refresh the active sessions in the sessions collection. uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords)); activeSessionsBackSwapper.Dismiss(); { stdx::lock_guard lk(_cacheMutex); _stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size()); } // Remove the ending sessions from the sessions collection. uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions)); explicitlyEndingBackSwaper.Dismiss(); { stdx::lock_guard lk(_cacheMutex); _stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size()); } // Find which running, but not recently active sessions, are expired, and add them // to the list of sessions to kill cursors for KillAllSessionsByPatternSet patterns; auto openCursorSessions = _service->getOpenCursorSessions(); // think about pruning ending and active out of openCursorSessions auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions); if (statusAndRemovedSessions.isOK()) { auto removedSessions = statusAndRemovedSessions.getValue(); for (const auto& lsid : removedSessions) { patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); } } else { // Ignore errors. } // Add all of the explicitly ended sessions to the list of sessions to kill cursors for. for (const auto& lsid : explicitlyEndingSessions) { patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); } SessionKiller::Matcher matcher(std::move(patterns)); auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)); { stdx::lock_guard lk(_cacheMutex); _stats.setLastSessionsCollectionJobCursorsClosed(killRes.second); } } void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) { stdx::lock_guard lk(_cacheMutex); _endingSessions.insert(begin(sessions), end(sessions)); } LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() { stdx::lock_guard lk(_cacheMutex); _stats.setActiveSessionsCount(_activeSessions.size()); return _stats; } void LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) { stdx::lock_guard lk(_cacheMutex); _activeSessions.insert(std::make_pair(record.getId(), record)); } std::vector LogicalSessionCacheImpl::listIds() const { stdx::lock_guard lk(_cacheMutex); std::vector ret; ret.reserve(_activeSessions.size()); for (const auto& id : _activeSessions) { ret.push_back(id.first); } return ret; } std::vector LogicalSessionCacheImpl::listIds( const std::vector& userDigests) const { stdx::lock_guard lk(_cacheMutex); std::vector ret; for (const auto& it : _activeSessions) { if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) != userDigests.cend()) { ret.push_back(it.first); } } return ret; } boost::optional LogicalSessionCacheImpl::peekCached( const LogicalSessionId& id) const { stdx::lock_guard lk(_cacheMutex); const auto it = _activeSessions.find(id); if (it == _activeSessions.end()) { return boost::none; } return it->second; } } // namespace mongo