summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/cursor_manager.cpp7
-rw-r--r--src/mongo/db/cursor_manager.h9
-rw-r--r--src/mongo/db/kill_sessions_common.h8
-rw-r--r--src/mongo/db/kill_sessions_local.cpp3
-rw-r--r--src/mongo/db/logical_session_cache.h6
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp86
-rw-r--r--src/mongo/db/logical_session_cache_impl.h6
-rw-r--r--src/mongo/db/logical_session_cache_noop.h4
-rw-r--r--src/mongo/db/logical_session_cache_stats.idl63
-rw-r--r--src/mongo/db/logical_session_server_status_section.cpp2
-rw-r--r--src/mongo/db/service_liason.h6
-rw-r--r--src/mongo/db/service_liason_mock.cpp4
-rw-r--r--src/mongo/db/service_liason_mock.h8
-rw-r--r--src/mongo/db/service_liason_mongod.cpp4
-rw-r--r--src/mongo/db/service_liason_mongod.h4
-rw-r--r--src/mongo/db/service_liason_mongos.cpp4
-rw-r--r--src/mongo/db/service_liason_mongos.h4
-rw-r--r--src/mongo/db/transaction_reaper.cpp52
-rw-r--r--src/mongo/db/transaction_reaper.h2
-rw-r--r--src/mongo/s/commands/kill_sessions_remote.cpp5
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.cpp4
-rw-r--r--src/mongo/s/query/cluster_cursor_manager.h5
23 files changed, 245 insertions, 52 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index c0919865dc0..7a685c461ed 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1067,6 +1067,7 @@ env.Library(
target='logical_session_cache',
source=[
'logical_session_cache.cpp',
+ env.Idlc('logical_session_cache_stats.idl')[0],
],
LIBDEPS=[
'logical_session_id',
diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp
index 392b0891697..0b85ccee4ed 100644
--- a/src/mongo/db/cursor_manager.cpp
+++ b/src/mongo/db/cursor_manager.cpp
@@ -317,15 +317,16 @@ std::vector<GenericCursor> CursorManager::getAllCursors(OperationContext* opCtx)
return cursors;
}
-Status CursorManager::killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) {
+std::pair<Status, int> CursorManager::killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
auto eraser = [&](CursorManager& mgr, CursorId id) {
uassertStatusOK(mgr.eraseCursor(opCtx, id, true));
};
auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser));
globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor);
- return visitor.getStatus();
+
+ return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled());
}
std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) {
diff --git a/src/mongo/db/cursor_manager.h b/src/mongo/db/cursor_manager.h
index add9d7384ba..9683728adab 100644
--- a/src/mongo/db/cursor_manager.h
+++ b/src/mongo/db/cursor_manager.h
@@ -28,6 +28,8 @@
#pragma once
+#include <utility>
+
#include "mongo/db/catalog/util/partitioned.h"
#include "mongo/db/clientcursor.h"
#include "mongo/db/cursor_id.h"
@@ -93,10 +95,11 @@ public:
static std::vector<GenericCursor> getAllCursors(OperationContext* opCtx);
/**
- * Kills cursors with matching logical sessions.
+ * Kills cursors with matching logical sessions. Returns a pair with the overall
+ * Status of the operation and the number of cursors successfully killed.
*/
- static Status killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher);
+ static std::pair<Status, int> killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher);
CursorManager(NamespaceString nss);
diff --git a/src/mongo/db/kill_sessions_common.h b/src/mongo/db/kill_sessions_common.h
index 4b6556db7a8..6be4d8f3cec 100644
--- a/src/mongo/db/kill_sessions_common.h
+++ b/src/mongo/db/kill_sessions_common.h
@@ -84,7 +84,7 @@ public:
KillSessionsCursorManagerVisitor(OperationContext* opCtx,
const SessionKiller::Matcher& matcher,
Eraser&& eraser)
- : _opCtx(opCtx), _matcher(matcher), _eraser(eraser) {}
+ : _opCtx(opCtx), _matcher(matcher), _cursorsKilled(0), _eraser(eraser) {}
template <typename Mgr>
void operator()(Mgr& mgr) {
@@ -99,6 +99,7 @@ public:
for (const auto& id : cursors) {
try {
_eraser(mgr, id);
+ _cursorsKilled++;
} catch (...) {
_failures.push_back(exceptionToStatus());
}
@@ -123,10 +124,15 @@ public:
<< _failures.back().reason());
}
+ int getCursorsKilled() const {
+ return _cursorsKilled;
+ }
+
private:
OperationContext* _opCtx;
const SessionKiller::Matcher& _matcher;
std::vector<Status> _failures;
+ int _cursorsKilled;
Eraser _eraser;
};
diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp
index 981cab7d85c..2bff47e89cc 100644
--- a/src/mongo/db/kill_sessions_local.cpp
+++ b/src/mongo/db/kill_sessions_local.cpp
@@ -44,7 +44,8 @@ namespace mongo {
SessionKiller::Result killSessionsLocalKillCursors(OperationContext* opCtx,
const SessionKiller::Matcher& matcher) {
- auto status = CursorManager::killCursorsWithMatchingSessions(opCtx, matcher);
+ auto res = CursorManager::killCursorsWithMatchingSessions(opCtx, matcher);
+ auto status = res.first;
if (status.isOK()) {
return std::vector<HostAndPort>{};
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index 14a14968b67..ece611339ba 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -32,6 +32,7 @@
#include "mongo/base/status.h"
#include "mongo/db/commands/end_sessions_gen.h"
+#include "mongo/db/logical_session_cache_stats_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/refresh_sessions_gen.h"
@@ -126,6 +127,11 @@ public:
* Retrieve a LogicalSessionRecord by LogicalSessionId, if it exists in the cache.
*/
virtual boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const = 0;
+
+ /**
+ * Returns stats about the logical session cache and its recent operations.
+ */
+ virtual LogicalSessionCacheStats getStats() = 0;
};
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index 659cb087a62..ca20ecad1a1 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -71,6 +71,8 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
_service->scheduleJob(
{[this](Client* client) { _periodicReap(client); }, _refreshInterval});
}
+ _stats.setLastSessionsCollectionJobTimestamp(now());
+ _stats.setLastTransactionReaperJobTimestamp(now());
}
LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
@@ -178,6 +180,21 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return Status::OK();
}
+ // Take the lock to update some stats.
+ {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+
+ // Clear the last set of stats for our new run.
+ _stats.setLastTransactionReaperJobDurationMillis(0);
+ _stats.setLastTransactionReaperJobEntriesCleanedUp(0);
+
+ // Start the new run.
+ _stats.setLastTransactionReaperJobTimestamp(now());
+ _stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1);
+ }
+
+ int numReaped = 0;
+
try {
boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
auto* const opCtx = [&client, &uniqueCtx] {
@@ -188,12 +205,26 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
uniqueCtx.emplace(client->makeOperationContext());
return uniqueCtx->get();
}();
+
stdx::lock_guard<stdx::mutex> lk(_reaperMutex);
- _transactionReaper->reap(opCtx);
+ numReaped = _transactionReaper->reap(opCtx);
} catch (...) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
+ _stats.setLastTransactionReaperJobDurationMillis(millis.count());
+ }
+
return exceptionToStatus();
}
+ {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
+ _stats.setLastTransactionReaperJobDurationMillis(millis.count());
+ _stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped);
+ }
+
return Status::OK();
}
@@ -205,6 +236,28 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
return;
}
+ // Stats for serverStatus:
+ {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+
+ // Clear the refresh-related stats with the beginning of our run.
+ _stats.setLastSessionsCollectionJobDurationMillis(0);
+ _stats.setLastSessionsCollectionJobEntriesRefreshed(0);
+ _stats.setLastSessionsCollectionJobEntriesEnded(0);
+ _stats.setLastSessionsCollectionJobCursorsClosed(0);
+
+ // Start the new run.
+ _stats.setLastSessionsCollectionJobTimestamp(now());
+ _stats.setSessionsCollectionJobCount(_stats.getSessionsCollectionJobCount() + 1);
+ }
+
+ // This will finish timing _refresh for our stats no matter when we return.
+ const auto timeRefreshJob = MakeGuard([this] {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp();
+ _stats.setLastSessionsCollectionJobDurationMillis(millis.count());
+ });
+
// get or make an opCtx
boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
auto* const opCtx = [&client, &uniqueCtx] {
@@ -272,13 +325,23 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
for (const auto& it : activeSessions) {
activeSessionRecords.insert(it.second);
}
- // refresh the active sessions in the sessions collection
+
+ // Refresh the active sessions in the sessions collection.
uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords));
activeSessionsBackSwapper.Dismiss();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ _stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size());
+ }
- // remove the ending sessions from the sessions collection
+ // Remove the ending sessions from the sessions collection.
uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions));
explicitlyEndingBackSwaper.Dismiss();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ _stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size());
+ }
+
// Find which running, but not recently active sessions, are expired, and add them
// to the list of sessions to kill cursors for
@@ -286,6 +349,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
KillAllSessionsByPatternSet patterns;
auto openCursorSessions = _service->getOpenCursorSessions();
+
// think about pruning ending and active out of openCursorSessions
auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions);
@@ -296,13 +360,17 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
}
}
- // Add all of the explicitly ended sessions to the list of sessions to kill cursors for
-
+ // Add all of the explicitly ended sessions to the list of sessions to kill cursors for.
for (const auto& lsid : explicitlyEndingSessions) {
patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid));
}
+
SessionKiller::Matcher matcher(std::move(patterns));
- _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)).ignore();
+ auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher));
+ {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ _stats.setLastSessionsCollectionJobCursorsClosed(killRes.second);
+ }
}
void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
@@ -310,6 +378,12 @@ void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
_endingSessions.insert(begin(sessions), end(sessions));
}
+LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ _stats.setActiveSessionsCount(_activeSessions.size());
+ return _stats;
+}
+
void LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) {
stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
_activeSessions.insert(std::make_pair(record.getId(), record));
diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h
index 956b4c1dbda..445159cd4e1 100644
--- a/src/mongo/db/logical_session_cache_impl.h
+++ b/src/mongo/db/logical_session_cache_impl.h
@@ -125,6 +125,8 @@ public:
void endSessions(const LogicalSessionIdSet& sessions) override;
+ LogicalSessionCacheStats getStats() override;
+
private:
/**
* Internal methods to handle scheduling and perform refreshes for active
@@ -149,6 +151,10 @@ private:
const Minutes _refreshInterval;
const Minutes _sessionTimeout;
+ // This value is only modified under the lock, and is modified
+ // automatically by the background jobs.
+ LogicalSessionCacheStats _stats;
+
std::unique_ptr<ServiceLiason> _service;
std::shared_ptr<SessionsCollection> _sessionsColl;
diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h
index eebcb1d224e..acb6531a388 100644
--- a/src/mongo/db/logical_session_cache_noop.h
+++ b/src/mongo/db/logical_session_cache_noop.h
@@ -87,6 +87,10 @@ public:
return boost::none;
}
+ LogicalSessionCacheStats getStats() override {
+ return {};
+ };
+
void endSessions(const LogicalSessionIdSet& lsids) override {}
};
diff --git a/src/mongo/db/logical_session_cache_stats.idl b/src/mongo/db/logical_session_cache_stats.idl
new file mode 100644
index 00000000000..ac4b9d6627c
--- /dev/null
+++ b/src/mongo/db/logical_session_cache_stats.idl
@@ -0,0 +1,63 @@
+# Copyright (C) 2017 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+# This IDL file describes the BSON format for a LogicalSessionId, and
+# handles the serialization to and deserialization from its BSON representation
+# for that class.
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/idl/basic_types.idl"
+
+structs:
+
+ LogicalSessionCacheStats:
+ description: "A struct representing the section of the server status
+ command with information about the logical session cache"
+ strict: true
+ fields:
+ activeSessionsCount:
+ type: int
+ default: 0
+ sessionsCollectionJobCount:
+ type: int
+ default: 0
+ lastSessionsCollectionJobDurationMillis:
+ type: int
+ default: 0
+ lastSessionsCollectionJobTimestamp:
+ type: date
+ lastSessionsCollectionJobEntriesRefreshed:
+ type: int
+ default: 0
+ lastSessionsCollectionJobEntriesEnded:
+ type: int
+ default: 0
+ lastSessionsCollectionJobCursorsClosed:
+ type: int
+ default: 0
+ transactionReaperJobCount:
+ type: int
+ default: 0
+ lastTransactionReaperJobDurationMillis:
+ type: int
+ default: 0
+ lastTransactionReaperJobTimestamp:
+ type: date
+ lastTransactionReaperJobEntriesCleanedUp:
+ type: int
+ default: 0
diff --git a/src/mongo/db/logical_session_server_status_section.cpp b/src/mongo/db/logical_session_server_status_section.cpp
index e291728ceee..48a2bd5e9ff 100644
--- a/src/mongo/db/logical_session_server_status_section.cpp
+++ b/src/mongo/db/logical_session_server_status_section.cpp
@@ -50,7 +50,7 @@ public:
virtual BSONObj generateSection(OperationContext* opCtx,
const BSONElement& configElement) const {
auto lsCache = LogicalSessionCache::get(opCtx);
- return BSON("records" << static_cast<int64_t>(lsCache ? lsCache->size() : 0));
+ return lsCache ? lsCache->getStats().toBSON() : BSONObj();
}
} LogicalSessionSSS;
diff --git a/src/mongo/db/service_liason.h b/src/mongo/db/service_liason.h
index b51076fefe9..2b250e6943b 100644
--- a/src/mongo/db/service_liason.h
+++ b/src/mongo/db/service_liason.h
@@ -83,10 +83,10 @@ public:
virtual Date_t now() const = 0;
/**
- * deligaes to a similarly named function on a cursormanager
+ * Deligates to a similarly named function on a cursor manager.
*/
- virtual Status killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) = 0;
+ virtual std::pair<Status, int> killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher) = 0;
protected:
/**
diff --git a/src/mongo/db/service_liason_mock.cpp b/src/mongo/db/service_liason_mock.cpp
index df0d8ab5276..57649dc704d 100644
--- a/src/mongo/db/service_liason_mock.cpp
+++ b/src/mongo/db/service_liason_mock.cpp
@@ -108,11 +108,11 @@ const KillAllSessionsByPattern* MockServiceLiasonImpl::matchKilled(const Logical
return _matcher->match(lsid);
}
-Status MockServiceLiasonImpl::killCursorsWithMatchingSessions(
+std::pair<Status, int> MockServiceLiasonImpl::killCursorsWithMatchingSessions(
OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
_matcher = matcher;
- return Status::OK();
+ return std::make_pair(Status::OK(), 0);
}
} // namespace mongo
diff --git a/src/mongo/db/service_liason_mock.h b/src/mongo/db/service_liason_mock.h
index f7b6f0be535..19a7d8c321f 100644
--- a/src/mongo/db/service_liason_mock.h
+++ b/src/mongo/db/service_liason_mock.h
@@ -78,8 +78,8 @@ public:
int jobs();
const KillAllSessionsByPattern* matchKilled(const LogicalSessionId& lsid);
- Status killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher);
+ std::pair<Status, int> killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher);
private:
executor::AsyncTimerFactoryMock* _timerFactory;
@@ -120,8 +120,8 @@ public:
return _impl->join();
}
- Status killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) {
+ std::pair<Status, int> killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher) override {
return _impl->killCursorsWithMatchingSessions(opCtx, matcher);
}
diff --git a/src/mongo/db/service_liason_mongod.cpp b/src/mongo/db/service_liason_mongod.cpp
index 8c4ad33a987..70e7ced614b 100644
--- a/src/mongo/db/service_liason_mongod.cpp
+++ b/src/mongo/db/service_liason_mongod.cpp
@@ -103,8 +103,8 @@ ServiceContext* ServiceLiasonMongod::_context() {
return getGlobalServiceContext();
}
-Status ServiceLiasonMongod::killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) {
+std::pair<Status, int> ServiceLiasonMongod::killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
return CursorManager::getGlobalCursorManager()->killCursorsWithMatchingSessions(opCtx, matcher);
}
diff --git a/src/mongo/db/service_liason_mongod.h b/src/mongo/db/service_liason_mongod.h
index 3feb502c437..562429efca5 100644
--- a/src/mongo/db/service_liason_mongod.h
+++ b/src/mongo/db/service_liason_mongod.h
@@ -59,8 +59,8 @@ public:
Date_t now() const override;
- Status killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) override;
+ std::pair<Status, int> killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher) override;
protected:
/**
diff --git a/src/mongo/db/service_liason_mongos.cpp b/src/mongo/db/service_liason_mongos.cpp
index 37c5420d28d..f972d87951f 100644
--- a/src/mongo/db/service_liason_mongos.cpp
+++ b/src/mongo/db/service_liason_mongos.cpp
@@ -76,8 +76,8 @@ ServiceContext* ServiceLiasonMongos::_context() {
return getGlobalServiceContext();
}
-Status ServiceLiasonMongos::killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) {
+std::pair<Status, int> ServiceLiasonMongos::killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
auto cursorManager = Grid::get(getGlobalServiceContext())->getCursorManager();
return cursorManager->killCursorsWithMatchingSessions(opCtx, matcher);
}
diff --git a/src/mongo/db/service_liason_mongos.h b/src/mongo/db/service_liason_mongos.h
index 26780f7b702..4ff26b8cb1c 100644
--- a/src/mongo/db/service_liason_mongos.h
+++ b/src/mongo/db/service_liason_mongos.h
@@ -59,8 +59,8 @@ public:
Date_t now() const override;
- Status killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher) override;
+ std::pair<Status, int> killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher) override;
protected:
/**
diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp
index f9187cb9e82..a3b804a839d 100644
--- a/src/mongo/db/transaction_reaper.cpp
+++ b/src/mongo/db/transaction_reaper.cpp
@@ -40,6 +40,7 @@
#include "mongo/db/server_parameters.h"
#include "mongo/db/session_txn_record_gen.h"
#include "mongo/db/sessions_collection.h"
+#include "mongo/platform/atomic_word.h"
#include "mongo/s/catalog_cache.h"
#include "mongo/s/client/shard.h"
#include "mongo/s/client/shard_registry.h"
@@ -104,7 +105,7 @@ public:
TransactionReaperImpl(std::shared_ptr<SessionsCollection> collection)
: _collection(std::move(collection)) {}
- void reap(OperationContext* opCtx) override {
+ int reap(OperationContext* opCtx) override {
Handler handler(opCtx, _collection.get());
Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IS);
@@ -129,17 +130,21 @@ public:
handler.handleLsid(transactionSession.get_id());
}
}
+
+ // Before the handler goes out of scope, flush its last batch to disk and collect stats.
+ return handler.finalize();
}
private:
std::shared_ptr<SessionsCollection> _collection;
};
-void handleBatchHelper(SessionsCollection* sessionsCollection,
- OperationContext* opCtx,
- const LogicalSessionIdSet& batch) {
+int handleBatchHelper(SessionsCollection* sessionsCollection,
+ OperationContext* opCtx,
+ const LogicalSessionIdSet& batch) {
auto removed = uassertStatusOK(sessionsCollection->findRemovedSessions(opCtx, batch));
uassertStatusOK(sessionsCollection->removeTransactionRecords(opCtx, removed));
+ return removed.size();
}
/**
@@ -148,25 +153,35 @@ void handleBatchHelper(SessionsCollection* sessionsCollection,
class ReplHandler {
public:
ReplHandler(OperationContext* opCtx, SessionsCollection* collection)
- : _opCtx(opCtx), _sessionsCollection(collection) {}
+ : _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {}
~ReplHandler() {
- DESTRUCTOR_GUARD([&] { handleBatchHelper(_sessionsCollection, _opCtx, _batch); }());
+ invariant(_finalized.load());
}
void handleLsid(const LogicalSessionId& lsid) {
_batch.insert(lsid);
if (_batch.size() > write_ops::kMaxWriteBatchSize) {
- handleBatchHelper(_sessionsCollection, _opCtx, _batch);
+ _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch);
_batch.clear();
}
}
+ int finalize() {
+ invariant(!_finalized.swap(true));
+ _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch);
+ return _numReaped;
+ }
+
private:
OperationContext* _opCtx;
SessionsCollection* _sessionsCollection;
LogicalSessionIdSet _batch;
+
+ int _numReaped;
+
+ AtomicBool _finalized;
};
/**
@@ -176,14 +191,10 @@ private:
class ShardedHandler {
public:
ShardedHandler(OperationContext* opCtx, SessionsCollection* collection)
- : _opCtx(opCtx), _sessionsCollection(collection) {}
+ : _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {}
~ShardedHandler() {
- DESTRUCTOR_GUARD([&] {
- for (const auto& pair : _shards) {
- handleBatchHelper(_sessionsCollection, _opCtx, pair.second);
- }
- }());
+ invariant(_finalized.load());
}
void handleLsid(const LogicalSessionId& lsid) {
@@ -210,18 +221,31 @@ public:
auto& lsids = _shards[shardId];
lsids.insert(lsid);
if (lsids.size() > write_ops::kMaxWriteBatchSize) {
- handleBatchHelper(_sessionsCollection, _opCtx, lsids);
+ _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, lsids);
_shards.erase(shardId);
}
}
+ int finalize() {
+ invariant(!_finalized.swap(true));
+ for (const auto& pair : _shards) {
+ _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, pair.second);
+ }
+
+ return _numReaped;
+ }
+
private:
OperationContext* _opCtx;
SessionsCollection* _sessionsCollection;
std::shared_ptr<ChunkManager> _cm;
std::shared_ptr<Shard> _primary;
+ int _numReaped;
+
stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards;
+
+ AtomicBool _finalized;
};
} // namespace
diff --git a/src/mongo/db/transaction_reaper.h b/src/mongo/db/transaction_reaper.h
index fdec1083916..16faf13a6b6 100644
--- a/src/mongo/db/transaction_reaper.h
+++ b/src/mongo/db/transaction_reaper.h
@@ -49,7 +49,7 @@ public:
virtual ~TransactionReaper() = 0;
- virtual void reap(OperationContext* OperationContext) = 0;
+ virtual int reap(OperationContext* OperationContext) = 0;
/**
* The implementation of the sessions collections is different in replica sets versus sharded
diff --git a/src/mongo/s/commands/kill_sessions_remote.cpp b/src/mongo/s/commands/kill_sessions_remote.cpp
index a89e407eb45..78db3419f10 100644
--- a/src/mongo/s/commands/kill_sessions_remote.cpp
+++ b/src/mongo/s/commands/kill_sessions_remote.cpp
@@ -110,7 +110,10 @@ SessionKiller::Result parallelExec(OperationContext* opCtx,
Status killSessionsRemoteKillCursor(OperationContext* opCtx,
const SessionKiller::Matcher& matcher) {
- return Grid::get(opCtx)->getCursorManager()->killCursorsWithMatchingSessions(opCtx, matcher);
+ return Grid::get(opCtx)
+ ->getCursorManager()
+ ->killCursorsWithMatchingSessions(opCtx, matcher)
+ .first;
}
} // namespace
diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp
index 42ed57e83aa..ed34635a8fc 100644
--- a/src/mongo/s/query/cluster_cursor_manager.cpp
+++ b/src/mongo/s/query/cluster_cursor_manager.cpp
@@ -520,7 +520,7 @@ std::vector<GenericCursor> ClusterCursorManager::getAllCursors() const {
return cursors;
}
-Status ClusterCursorManager::killCursorsWithMatchingSessions(
+std::pair<Status, int> ClusterCursorManager::killCursorsWithMatchingSessions(
OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
auto eraser = [&](ClusterCursorManager& mgr, CursorId id) {
uassertStatusOK(mgr.killCursor(getNamespaceForCursorId(id).get(), id));
@@ -528,7 +528,7 @@ Status ClusterCursorManager::killCursorsWithMatchingSessions(
auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser));
visitor(*this);
- return visitor.getStatus();
+ return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled());
}
stdx::unordered_set<CursorId> ClusterCursorManager::getCursorsForSession(
diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h
index d3392c9a0c0..6bd9e20c3a6 100644
--- a/src/mongo/s/query/cluster_cursor_manager.h
+++ b/src/mongo/s/query/cluster_cursor_manager.h
@@ -29,6 +29,7 @@
#pragma once
#include <memory>
+#include <utility>
#include <vector>
#include "mongo/db/cursor_id.h"
@@ -369,8 +370,8 @@ public:
*/
std::vector<GenericCursor> getAllCursors() const;
- Status killCursorsWithMatchingSessions(OperationContext* opCtx,
- const SessionKiller::Matcher& matcher);
+ std::pair<Status, int> killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher);
/**
* Returns a list of all open cursors for the given session.