diff options
Diffstat (limited to 'src/mongo/db/logical_session_cache_impl.cpp')
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 110 |
1 files changed, 52 insertions, 58 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index 24d278c9150..2095595eb5f 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -33,7 +33,6 @@ #include "mongo/db/logical_session_cache_impl.h" -#include "mongo/db/logical_session_cache_impl_gen.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" @@ -58,13 +57,12 @@ void clearShardingOperationFailedStatus(OperationContext* opCtx) { } // namespace -LogicalSessionCacheImpl::LogicalSessionCacheImpl( - std::unique_ptr<ServiceLiaison> service, - std::shared_ptr<SessionsCollection> collection, - std::shared_ptr<TransactionReaper> transactionReaper) +LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service, + std::shared_ptr<SessionsCollection> collection, + ReapSessionsOlderThanFn reapSessionsOlderThanFn) : _service(std::move(service)), _sessionsColl(std::move(collection)), - _transactionReaper(std::move(transactionReaper)) { + _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) { _stats.setLastSessionsCollectionJobTimestamp(now()); _stats.setLastTransactionReaperJobTimestamp(now()); @@ -73,26 +71,22 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl( [this](Client* client) { _periodicRefresh(client); }, Milliseconds(logicalSessionRefreshMillis)}); - if (_transactionReaper) { - _service->scheduleJob({"LogicalSessionCacheReap", - [this](Client* client) { _periodicReap(client); }, - Milliseconds(logicalSessionRefreshMillis)}); - } + _service->scheduleJob({"LogicalSessionCacheReap", + [this](Client* client) { _periodicReap(client); }, + Milliseconds(logicalSessionRefreshMillis)}); } } LogicalSessionCacheImpl::~LogicalSessionCacheImpl() { - try { - _service->join(); - } catch (...) { - // If we failed to join we might still be running a background thread, log but swallow the - // error since there is no good way to recover - severe() << "Failed to join background service thread"; - } + joinOnShutDown(); +} + +void LogicalSessionCacheImpl::joinOnShutDown() { + _service->join(); } Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); auto it = _activeSessions.find(lsid); if (it == _activeSessions.end()) { return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; @@ -167,7 +161,7 @@ Date_t LogicalSessionCacheImpl::now() { } size_t LogicalSessionCacheImpl::size() { - stdx::lock_guard<stdx::mutex> lock(_cacheMutex); + stdx::lock_guard<stdx::mutex> lock(_mutex); return _activeSessions.size(); } @@ -189,13 +183,9 @@ void LogicalSessionCacheImpl::_periodicReap(Client* client) { } Status LogicalSessionCacheImpl::_reap(Client* client) { - if (!_transactionReaper) { - return Status::OK(); - } - // Take the lock to update some stats. { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); // Clear the last set of stats for our new run. _stats.setLastTransactionReaperJobDurationMillis(0); @@ -206,19 +196,19 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { _stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1); } - int numReaped = 0; + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto* const opCtx = [&] { + if (client->getOperationContext()) { + return client->getOperationContext(); + } - try { - boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; - auto* const opCtx = [&client, &uniqueCtx] { - if (client->getOperationContext()) { - return client->getOperationContext(); - } + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); - uniqueCtx.emplace(client->makeOperationContext()); - return uniqueCtx->get(); - }(); + int numReaped = 0; + try { ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); }); auto existsStatus = _sessionsColl->checkSessionsCollectionExists(opCtx); @@ -236,20 +226,23 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { return Status::OK(); } - stdx::lock_guard<stdx::mutex> lk(_reaperMutex); - numReaped = _transactionReaper->reap(opCtx); - } catch (...) { + numReaped = + _reapSessionsOlderThanFn(opCtx, + *_sessionsColl, + opCtx->getServiceContext()->getFastClockSource()->now() - + Minutes(gTransactionRecordMinimumLifetimeMinutes)); + } catch (const DBException& ex) { { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); _stats.setLastTransactionReaperJobDurationMillis(millis.count()); } - return exceptionToStatus(); + return ex.toStatus(); } { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); _stats.setLastTransactionReaperJobDurationMillis(millis.count()); _stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped); @@ -261,7 +254,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { void LogicalSessionCacheImpl::_refresh(Client* client) { // Stats for serverStatus: { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); // Clear the refresh-related stats with the beginning of our run. _stats.setLastSessionsCollectionJobDurationMillis(0); @@ -276,7 +269,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { // This will finish timing _refresh for our stats no matter when we return. const auto timeRefreshJob = makeGuard([this] { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp(); _stats.setLastSessionsCollectionJobDurationMillis(millis.count()); }); @@ -308,7 +301,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { { using std::swap; - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); swap(explicitlyEndingSessions, _endingSessions); swap(activeSessions, _activeSessions); } @@ -317,7 +310,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { // swapped out of LogicalSessionCache, and merges in any records that had been added since we // swapped them out. auto backSwap = [this](auto& member, auto& temp) { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); using std::swap; swap(member, temp); for (const auto& it : temp) { @@ -333,9 +326,8 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { activeSessions.erase(lsid); } - // refresh all recently active sessions as well as for sessions attached to running ops - - LogicalSessionRecordSet activeSessionRecords{}; + // Refresh all recently active sessions as well as for sessions attached to running ops + LogicalSessionRecordSet activeSessionRecords; auto runningOpSessions = _service->getActiveOpSessions(); @@ -354,7 +346,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords)); activeSessionsBackSwapper.dismiss(); { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size()); } @@ -362,7 +354,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions)); explicitlyEndingBackSwaper.dismiss(); { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size()); } @@ -375,7 +367,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { // Exclude sessions added to _activeSessions from the openCursorSession to avoid race between // killing cursors on the removed sessions and creating sessions. { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); for (const auto& it : _activeSessions) { auto newSessionIt = openCursorSessions.find(it.first); @@ -405,33 +397,34 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { SessionKiller::Matcher matcher(std::move(patterns)); auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)); { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _stats.setLastSessionsCollectionJobCursorsClosed(killRes.second); } } void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _endingSessions.insert(begin(sessions), end(sessions)); } LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); _stats.setActiveSessionsCount(_activeSessions.size()); return _stats; } Status LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); if (_activeSessions.size() >= static_cast<size_t>(maxSessions)) { return {ErrorCodes::TooManyLogicalSessions, "cannot add session into the cache"}; } + _activeSessions.insert(std::make_pair(record.getId(), record)); return Status::OK(); } std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); std::vector<LogicalSessionId> ret; ret.reserve(_activeSessions.size()); for (const auto& id : _activeSessions) { @@ -442,7 +435,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const { std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( const std::vector<SHA256Block>& userDigests) const { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); std::vector<LogicalSessionId> ret; for (const auto& it : _activeSessions) { if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) != @@ -455,11 +448,12 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached( const LogicalSessionId& id) const { - stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + stdx::lock_guard<stdx::mutex> lk(_mutex); const auto it = _activeSessions.find(id); if (it == _activeSessions.end()) { return boost::none; } return it->second; } + } // namespace mongo |