summaryrefslogtreecommitdiff
path: root/src/mongo/db/logical_session_cache_impl.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/logical_session_cache_impl.cpp')
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp53
1 files changed, 47 insertions, 6 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index e84d431083b..ffafcccbd4f 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -58,18 +58,22 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, f
constexpr int LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity;
constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh;
-LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiason> service,
- std::unique_ptr<SessionsCollection> collection,
- Options options)
+LogicalSessionCacheImpl::LogicalSessionCacheImpl(
+ std::unique_ptr<ServiceLiason> service,
+ std::shared_ptr<SessionsCollection> collection,
+ std::shared_ptr<TransactionReaper> transactionReaper,
+ Options options)
: _refreshInterval(options.refreshInterval),
_sessionTimeout(options.sessionTimeout),
_service(std::move(service)),
_sessionsColl(std::move(collection)),
+ _transactionReaper(std::move(transactionReaper)),
_cache(options.capacity) {
if (!disableLogicalSessionCacheRefresh) {
- PeriodicRunner::PeriodicJob job{[this](Client* client) { _periodicRefresh(client); },
- duration_cast<Milliseconds>(_refreshInterval)};
- _service->scheduleJob(std::move(job));
+ _service->scheduleJob(
+ {[this](Client* client) { _periodicRefresh(client); }, _refreshInterval});
+ _service->scheduleJob(
+ {[this](Client* client) { _periodicReap(client); }, _refreshInterval});
}
}
@@ -145,6 +149,10 @@ Status LogicalSessionCacheImpl::refreshNow(Client* client) {
return _refresh(client);
}
+Status LogicalSessionCacheImpl::reapNow(Client* client) {
+ return _reap(client);
+}
+
Date_t LogicalSessionCacheImpl::now() {
return _service->now();
}
@@ -163,6 +171,39 @@ void LogicalSessionCacheImpl::_periodicRefresh(Client* client) {
return;
}
+void LogicalSessionCacheImpl::_periodicReap(Client* client) {
+ auto res = _reap(client);
+ if (!res.isOK()) {
+ log() << "Failed to reap transaction table: " << res;
+ }
+
+ return;
+}
+
+Status LogicalSessionCacheImpl::_reap(Client* client) {
+ if (!_transactionReaper) {
+ return Status::OK();
+ }
+
+ try {
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&client, &uniqueCtx] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
+
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
+ stdx::lock_guard<stdx::mutex> lk(_reaperMutex);
+ _transactionReaper->reap(opCtx);
+ } catch (...) {
+ return exceptionToStatus();
+ }
+
+ return Status::OK();
+}
+
Status LogicalSessionCacheImpl::_refresh(Client* client) {
LogicalSessionRecordSet activeSessions;
LogicalSessionRecordSet deadSessions;