diff options
Diffstat (limited to 'src/mongo/db/logical_session_cache_impl.cpp')
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 53 |
1 files changed, 47 insertions, 6 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index e84d431083b..ffafcccbd4f 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -58,18 +58,22 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, f constexpr int LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity; constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh; -LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiason> service, - std::unique_ptr<SessionsCollection> collection, - Options options) +LogicalSessionCacheImpl::LogicalSessionCacheImpl( + std::unique_ptr<ServiceLiason> service, + std::shared_ptr<SessionsCollection> collection, + std::shared_ptr<TransactionReaper> transactionReaper, + Options options) : _refreshInterval(options.refreshInterval), _sessionTimeout(options.sessionTimeout), _service(std::move(service)), _sessionsColl(std::move(collection)), + _transactionReaper(std::move(transactionReaper)), _cache(options.capacity) { if (!disableLogicalSessionCacheRefresh) { - PeriodicRunner::PeriodicJob job{[this](Client* client) { _periodicRefresh(client); }, - duration_cast<Milliseconds>(_refreshInterval)}; - _service->scheduleJob(std::move(job)); + _service->scheduleJob( + {[this](Client* client) { _periodicRefresh(client); }, _refreshInterval}); + _service->scheduleJob( + {[this](Client* client) { _periodicReap(client); }, _refreshInterval}); } } @@ -145,6 +149,10 @@ Status LogicalSessionCacheImpl::refreshNow(Client* client) { return _refresh(client); } +Status LogicalSessionCacheImpl::reapNow(Client* client) { + return _reap(client); +} + Date_t LogicalSessionCacheImpl::now() { return _service->now(); } @@ -163,6 +171,39 @@ void LogicalSessionCacheImpl::_periodicRefresh(Client* client) { return; } +void LogicalSessionCacheImpl::_periodicReap(Client* client) { + auto res = _reap(client); + if (!res.isOK()) { + log() << "Failed to reap transaction table: " << res; + } + + return; +} + +Status LogicalSessionCacheImpl::_reap(Client* client) { + if (!_transactionReaper) { + return Status::OK(); + } + + try { + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto* const opCtx = [&client, &uniqueCtx] { + if (client->getOperationContext()) { + return client->getOperationContext(); + } + + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); + stdx::lock_guard<stdx::mutex> lk(_reaperMutex); + _transactionReaper->reap(opCtx); + } catch (...) { + return exceptionToStatus(); + } + + return Status::OK(); +} + Status LogicalSessionCacheImpl::_refresh(Client* client) { LogicalSessionRecordSet activeSessions; LogicalSessionRecordSet deadSessions; |