summaryrefslogtreecommitdiff
path: root/src/mongo/db/logical_session_cache_impl.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-03 16:21:24 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-09 07:40:56 -0400
commit2791817876636c0cfd60d867f31c7a83cf3f18c1 (patch)
tree3aefcb1999cccf4cb53b2401a44857549ba8722a /src/mongo/db/logical_session_cache_impl.cpp
parent1b8a9f5dc5c3314042b55e7415a2a25045b32a94 (diff)
downloadmongo-2791817876636c0cfd60d867f31c7a83cf3f18c1.tar.gz
SERVER-37837 Get rid of TransactionReaper (Part 1)
This change gets rid of the TransactionReaper's usage of the ReplicationCoordinator for checking whether it is primary or not and makes the LogicalSessionCache joinable on shutdown. It also removes the TransactionReaper's grouping per-shard optimization and moves it all under SessionCollectionSharded.
Diffstat (limited to 'src/mongo/db/logical_session_cache_impl.cpp')
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp110
1 files changed, 52 insertions, 58 deletions
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index 24d278c9150..2095595eb5f 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -33,7 +33,6 @@
#include "mongo/db/logical_session_cache_impl.h"
-#include "mongo/db/logical_session_cache_impl_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
@@ -58,13 +57,12 @@ void clearShardingOperationFailedStatus(OperationContext* opCtx) {
} // namespace
-LogicalSessionCacheImpl::LogicalSessionCacheImpl(
- std::unique_ptr<ServiceLiaison> service,
- std::shared_ptr<SessionsCollection> collection,
- std::shared_ptr<TransactionReaper> transactionReaper)
+LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
+ std::shared_ptr<SessionsCollection> collection,
+ ReapSessionsOlderThanFn reapSessionsOlderThanFn)
: _service(std::move(service)),
_sessionsColl(std::move(collection)),
- _transactionReaper(std::move(transactionReaper)) {
+ _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) {
_stats.setLastSessionsCollectionJobTimestamp(now());
_stats.setLastTransactionReaperJobTimestamp(now());
@@ -73,26 +71,22 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
[this](Client* client) { _periodicRefresh(client); },
Milliseconds(logicalSessionRefreshMillis)});
- if (_transactionReaper) {
- _service->scheduleJob({"LogicalSessionCacheReap",
- [this](Client* client) { _periodicReap(client); },
- Milliseconds(logicalSessionRefreshMillis)});
- }
+ _service->scheduleJob({"LogicalSessionCacheReap",
+ [this](Client* client) { _periodicReap(client); },
+ Milliseconds(logicalSessionRefreshMillis)});
}
}
LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
- try {
- _service->join();
- } catch (...) {
- // If we failed to join we might still be running a background thread, log but swallow the
- // error since there is no good way to recover
- severe() << "Failed to join background service thread";
- }
+ joinOnShutDown();
+}
+
+void LogicalSessionCacheImpl::joinOnShutDown() {
+ _service->join();
}
Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto it = _activeSessions.find(lsid);
if (it == _activeSessions.end()) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
@@ -167,7 +161,7 @@ Date_t LogicalSessionCacheImpl::now() {
}
size_t LogicalSessionCacheImpl::size() {
- stdx::lock_guard<stdx::mutex> lock(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
return _activeSessions.size();
}
@@ -189,13 +183,9 @@ void LogicalSessionCacheImpl::_periodicReap(Client* client) {
}
Status LogicalSessionCacheImpl::_reap(Client* client) {
- if (!_transactionReaper) {
- return Status::OK();
- }
-
// Take the lock to update some stats.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the last set of stats for our new run.
_stats.setLastTransactionReaperJobDurationMillis(0);
@@ -206,19 +196,19 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
_stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1);
}
- int numReaped = 0;
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
- try {
- boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
- auto* const opCtx = [&client, &uniqueCtx] {
- if (client->getOperationContext()) {
- return client->getOperationContext();
- }
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
- uniqueCtx.emplace(client->makeOperationContext());
- return uniqueCtx->get();
- }();
+ int numReaped = 0;
+ try {
ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); });
auto existsStatus = _sessionsColl->checkSessionsCollectionExists(opCtx);
@@ -236,20 +226,23 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return Status::OK();
}
- stdx::lock_guard<stdx::mutex> lk(_reaperMutex);
- numReaped = _transactionReaper->reap(opCtx);
- } catch (...) {
+ numReaped =
+ _reapSessionsOlderThanFn(opCtx,
+ *_sessionsColl,
+ opCtx->getServiceContext()->getFastClockSource()->now() -
+ Minutes(gTransactionRecordMinimumLifetimeMinutes));
+ } catch (const DBException& ex) {
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
}
- return exceptionToStatus();
+ return ex.toStatus();
}
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
_stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped);
@@ -261,7 +254,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
void LogicalSessionCacheImpl::_refresh(Client* client) {
// Stats for serverStatus:
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
// Clear the refresh-related stats with the beginning of our run.
_stats.setLastSessionsCollectionJobDurationMillis(0);
@@ -276,7 +269,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// This will finish timing _refresh for our stats no matter when we return.
const auto timeRefreshJob = makeGuard([this] {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp();
_stats.setLastSessionsCollectionJobDurationMillis(millis.count());
});
@@ -308,7 +301,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
{
using std::swap;
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
swap(explicitlyEndingSessions, _endingSessions);
swap(activeSessions, _activeSessions);
}
@@ -317,7 +310,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// swapped out of LogicalSessionCache, and merges in any records that had been added since we
// swapped them out.
auto backSwap = [this](auto& member, auto& temp) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
using std::swap;
swap(member, temp);
for (const auto& it : temp) {
@@ -333,9 +326,8 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
activeSessions.erase(lsid);
}
- // refresh all recently active sessions as well as for sessions attached to running ops
-
- LogicalSessionRecordSet activeSessionRecords{};
+ // Refresh all recently active sessions as well as for sessions attached to running ops
+ LogicalSessionRecordSet activeSessionRecords;
auto runningOpSessions = _service->getActiveOpSessions();
@@ -354,7 +346,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords));
activeSessionsBackSwapper.dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size());
}
@@ -362,7 +354,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions));
explicitlyEndingBackSwaper.dismiss();
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size());
}
@@ -375,7 +367,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
// Exclude sessions added to _activeSessions from the openCursorSession to avoid race between
// killing cursors on the removed sessions and creating sessions.
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
for (const auto& it : _activeSessions) {
auto newSessionIt = openCursorSessions.find(it.first);
@@ -405,33 +397,34 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
SessionKiller::Matcher matcher(std::move(patterns));
auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher));
{
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setLastSessionsCollectionJobCursorsClosed(killRes.second);
}
}
void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_endingSessions.insert(begin(sessions), end(sessions));
}
LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
_stats.setActiveSessionsCount(_activeSessions.size());
return _stats;
}
Status LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
if (_activeSessions.size() >= static_cast<size_t>(maxSessions)) {
return {ErrorCodes::TooManyLogicalSessions, "cannot add session into the cache"};
}
+
_activeSessions.insert(std::make_pair(record.getId(), record));
return Status::OK();
}
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
ret.reserve(_activeSessions.size());
for (const auto& id : _activeSessions) {
@@ -442,7 +435,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
const std::vector<SHA256Block>& userDigests) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
std::vector<LogicalSessionId> ret;
for (const auto& it : _activeSessions) {
if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) !=
@@ -455,11 +448,12 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached(
const LogicalSessionId& id) const {
- stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
const auto it = _activeSessions.find(id);
if (it == _activeSessions.end()) {
return boost::none;
}
return it->second;
}
+
} // namespace mongo