diff options
Diffstat (limited to 'src')
23 files changed, 245 insertions, 52 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index c0919865dc0..7a685c461ed 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1067,6 +1067,7 @@ env.Library( target='logical_session_cache', source=[ 'logical_session_cache.cpp', + env.Idlc('logical_session_cache_stats.idl')[0], ], LIBDEPS=[ 'logical_session_id', diff --git a/src/mongo/db/cursor_manager.cpp b/src/mongo/db/cursor_manager.cpp index 392b0891697..0b85ccee4ed 100644 --- a/src/mongo/db/cursor_manager.cpp +++ b/src/mongo/db/cursor_manager.cpp @@ -317,15 +317,16 @@ std::vector<GenericCursor> CursorManager::getAllCursors(OperationContext* opCtx) return cursors; } -Status CursorManager::killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) { +std::pair<Status, int> CursorManager::killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher) { auto eraser = [&](CursorManager& mgr, CursorId id) { uassertStatusOK(mgr.eraseCursor(opCtx, id, true)); }; auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser)); globalCursorIdCache->visitAllCursorManagers(opCtx, &visitor); - return visitor.getStatus(); + + return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled()); } std::size_t CursorManager::timeoutCursorsGlobal(OperationContext* opCtx, Date_t now) { diff --git a/src/mongo/db/cursor_manager.h b/src/mongo/db/cursor_manager.h index add9d7384ba..9683728adab 100644 --- a/src/mongo/db/cursor_manager.h +++ b/src/mongo/db/cursor_manager.h @@ -28,6 +28,8 @@ #pragma once +#include <utility> + #include "mongo/db/catalog/util/partitioned.h" #include "mongo/db/clientcursor.h" #include "mongo/db/cursor_id.h" @@ -93,10 +95,11 @@ public: static std::vector<GenericCursor> getAllCursors(OperationContext* opCtx); /** - * Kills cursors with matching logical sessions. + * Kills cursors with matching logical sessions. Returns a pair with the overall + * Status of the operation and the number of cursors successfully killed. */ - static Status killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher); + static std::pair<Status, int> killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher); CursorManager(NamespaceString nss); diff --git a/src/mongo/db/kill_sessions_common.h b/src/mongo/db/kill_sessions_common.h index 4b6556db7a8..6be4d8f3cec 100644 --- a/src/mongo/db/kill_sessions_common.h +++ b/src/mongo/db/kill_sessions_common.h @@ -84,7 +84,7 @@ public: KillSessionsCursorManagerVisitor(OperationContext* opCtx, const SessionKiller::Matcher& matcher, Eraser&& eraser) - : _opCtx(opCtx), _matcher(matcher), _eraser(eraser) {} + : _opCtx(opCtx), _matcher(matcher), _cursorsKilled(0), _eraser(eraser) {} template <typename Mgr> void operator()(Mgr& mgr) { @@ -99,6 +99,7 @@ public: for (const auto& id : cursors) { try { _eraser(mgr, id); + _cursorsKilled++; } catch (...) { _failures.push_back(exceptionToStatus()); } @@ -123,10 +124,15 @@ public: << _failures.back().reason()); } + int getCursorsKilled() const { + return _cursorsKilled; + } + private: OperationContext* _opCtx; const SessionKiller::Matcher& _matcher; std::vector<Status> _failures; + int _cursorsKilled; Eraser _eraser; }; diff --git a/src/mongo/db/kill_sessions_local.cpp b/src/mongo/db/kill_sessions_local.cpp index 981cab7d85c..2bff47e89cc 100644 --- a/src/mongo/db/kill_sessions_local.cpp +++ b/src/mongo/db/kill_sessions_local.cpp @@ -44,7 +44,8 @@ namespace mongo { SessionKiller::Result killSessionsLocalKillCursors(OperationContext* opCtx, const SessionKiller::Matcher& matcher) { - auto status = CursorManager::killCursorsWithMatchingSessions(opCtx, matcher); + auto res = CursorManager::killCursorsWithMatchingSessions(opCtx, matcher); + auto status = res.first; if (status.isOK()) { return std::vector<HostAndPort>{}; diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h index 14a14968b67..ece611339ba 100644 --- a/src/mongo/db/logical_session_cache.h +++ b/src/mongo/db/logical_session_cache.h @@ -32,6 +32,7 @@ #include "mongo/base/status.h" #include "mongo/db/commands/end_sessions_gen.h" +#include "mongo/db/logical_session_cache_stats_gen.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/refresh_sessions_gen.h" @@ -126,6 +127,11 @@ public: * Retrieve a LogicalSessionRecord by LogicalSessionId, if it exists in the cache. */ virtual boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const = 0; + + /** + * Returns stats about the logical session cache and its recent operations. + */ + virtual LogicalSessionCacheStats getStats() = 0; }; } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index 659cb087a62..ca20ecad1a1 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -71,6 +71,8 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl( _service->scheduleJob( {[this](Client* client) { _periodicReap(client); }, _refreshInterval}); } + _stats.setLastSessionsCollectionJobTimestamp(now()); + _stats.setLastTransactionReaperJobTimestamp(now()); } LogicalSessionCacheImpl::~LogicalSessionCacheImpl() { @@ -178,6 +180,21 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { return Status::OK(); } + // Take the lock to update some stats. + { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + + // Clear the last set of stats for our new run. + _stats.setLastTransactionReaperJobDurationMillis(0); + _stats.setLastTransactionReaperJobEntriesCleanedUp(0); + + // Start the new run. + _stats.setLastTransactionReaperJobTimestamp(now()); + _stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1); + } + + int numReaped = 0; + try { boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; auto* const opCtx = [&client, &uniqueCtx] { @@ -188,12 +205,26 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { uniqueCtx.emplace(client->makeOperationContext()); return uniqueCtx->get(); }(); + stdx::lock_guard<stdx::mutex> lk(_reaperMutex); - _transactionReaper->reap(opCtx); + numReaped = _transactionReaper->reap(opCtx); } catch (...) { + { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); + _stats.setLastTransactionReaperJobDurationMillis(millis.count()); + } + return exceptionToStatus(); } + { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); + _stats.setLastTransactionReaperJobDurationMillis(millis.count()); + _stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped); + } + return Status::OK(); } @@ -205,6 +236,28 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { return; } + // Stats for serverStatus: + { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + + // Clear the refresh-related stats with the beginning of our run. + _stats.setLastSessionsCollectionJobDurationMillis(0); + _stats.setLastSessionsCollectionJobEntriesRefreshed(0); + _stats.setLastSessionsCollectionJobEntriesEnded(0); + _stats.setLastSessionsCollectionJobCursorsClosed(0); + + // Start the new run. + _stats.setLastSessionsCollectionJobTimestamp(now()); + _stats.setSessionsCollectionJobCount(_stats.getSessionsCollectionJobCount() + 1); + } + + // This will finish timing _refresh for our stats no matter when we return. + const auto timeRefreshJob = MakeGuard([this] { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp(); + _stats.setLastSessionsCollectionJobDurationMillis(millis.count()); + }); + // get or make an opCtx boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; auto* const opCtx = [&client, &uniqueCtx] { @@ -272,13 +325,23 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { for (const auto& it : activeSessions) { activeSessionRecords.insert(it.second); } - // refresh the active sessions in the sessions collection + + // Refresh the active sessions in the sessions collection. uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords)); activeSessionsBackSwapper.Dismiss(); + { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + _stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size()); + } - // remove the ending sessions from the sessions collection + // Remove the ending sessions from the sessions collection. uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions)); explicitlyEndingBackSwaper.Dismiss(); + { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + _stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size()); + } + // Find which running, but not recently active sessions, are expired, and add them // to the list of sessions to kill cursors for @@ -286,6 +349,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { KillAllSessionsByPatternSet patterns; auto openCursorSessions = _service->getOpenCursorSessions(); + // think about pruning ending and active out of openCursorSessions auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions); @@ -296,13 +360,17 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { } } - // Add all of the explicitly ended sessions to the list of sessions to kill cursors for - + // Add all of the explicitly ended sessions to the list of sessions to kill cursors for. for (const auto& lsid : explicitlyEndingSessions) { patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); } + SessionKiller::Matcher matcher(std::move(patterns)); - _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)).ignore(); + auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)); + { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + _stats.setLastSessionsCollectionJobCursorsClosed(killRes.second); + } } void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) { @@ -310,6 +378,12 @@ void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) { _endingSessions.insert(begin(sessions), end(sessions)); } +LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + _stats.setActiveSessionsCount(_activeSessions.size()); + return _stats; +} + void LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) { stdx::lock_guard<stdx::mutex> lk(_cacheMutex); _activeSessions.insert(std::make_pair(record.getId(), record)); diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h index 956b4c1dbda..445159cd4e1 100644 --- a/src/mongo/db/logical_session_cache_impl.h +++ b/src/mongo/db/logical_session_cache_impl.h @@ -125,6 +125,8 @@ public: void endSessions(const LogicalSessionIdSet& sessions) override; + LogicalSessionCacheStats getStats() override; + private: /** * Internal methods to handle scheduling and perform refreshes for active @@ -149,6 +151,10 @@ private: const Minutes _refreshInterval; const Minutes _sessionTimeout; + // This value is only modified under the lock, and is modified + // automatically by the background jobs. + LogicalSessionCacheStats _stats; + std::unique_ptr<ServiceLiason> _service; std::shared_ptr<SessionsCollection> _sessionsColl; diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h index eebcb1d224e..acb6531a388 100644 --- a/src/mongo/db/logical_session_cache_noop.h +++ b/src/mongo/db/logical_session_cache_noop.h @@ -87,6 +87,10 @@ public: return boost::none; } + LogicalSessionCacheStats getStats() override { + return {}; + }; + void endSessions(const LogicalSessionIdSet& lsids) override {} }; diff --git a/src/mongo/db/logical_session_cache_stats.idl b/src/mongo/db/logical_session_cache_stats.idl new file mode 100644 index 00000000000..ac4b9d6627c --- /dev/null +++ b/src/mongo/db/logical_session_cache_stats.idl @@ -0,0 +1,63 @@ +# Copyright (C) 2017 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +# This IDL file describes the BSON format for a LogicalSessionId, and +# handles the serialization to and deserialization from its BSON representation +# for that class. + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/idl/basic_types.idl" + +structs: + + LogicalSessionCacheStats: + description: "A struct representing the section of the server status + command with information about the logical session cache" + strict: true + fields: + activeSessionsCount: + type: int + default: 0 + sessionsCollectionJobCount: + type: int + default: 0 + lastSessionsCollectionJobDurationMillis: + type: int + default: 0 + lastSessionsCollectionJobTimestamp: + type: date + lastSessionsCollectionJobEntriesRefreshed: + type: int + default: 0 + lastSessionsCollectionJobEntriesEnded: + type: int + default: 0 + lastSessionsCollectionJobCursorsClosed: + type: int + default: 0 + transactionReaperJobCount: + type: int + default: 0 + lastTransactionReaperJobDurationMillis: + type: int + default: 0 + lastTransactionReaperJobTimestamp: + type: date + lastTransactionReaperJobEntriesCleanedUp: + type: int + default: 0 diff --git a/src/mongo/db/logical_session_server_status_section.cpp b/src/mongo/db/logical_session_server_status_section.cpp index e291728ceee..48a2bd5e9ff 100644 --- a/src/mongo/db/logical_session_server_status_section.cpp +++ b/src/mongo/db/logical_session_server_status_section.cpp @@ -50,7 +50,7 @@ public: virtual BSONObj generateSection(OperationContext* opCtx, const BSONElement& configElement) const { auto lsCache = LogicalSessionCache::get(opCtx); - return BSON("records" << static_cast<int64_t>(lsCache ? lsCache->size() : 0)); + return lsCache ? lsCache->getStats().toBSON() : BSONObj(); } } LogicalSessionSSS; diff --git a/src/mongo/db/service_liason.h b/src/mongo/db/service_liason.h index b51076fefe9..2b250e6943b 100644 --- a/src/mongo/db/service_liason.h +++ b/src/mongo/db/service_liason.h @@ -83,10 +83,10 @@ public: virtual Date_t now() const = 0; /** - * deligaes to a similarly named function on a cursormanager + * Deligates to a similarly named function on a cursor manager. */ - virtual Status killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) = 0; + virtual std::pair<Status, int> killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher) = 0; protected: /** diff --git a/src/mongo/db/service_liason_mock.cpp b/src/mongo/db/service_liason_mock.cpp index df0d8ab5276..57649dc704d 100644 --- a/src/mongo/db/service_liason_mock.cpp +++ b/src/mongo/db/service_liason_mock.cpp @@ -108,11 +108,11 @@ const KillAllSessionsByPattern* MockServiceLiasonImpl::matchKilled(const Logical return _matcher->match(lsid); } -Status MockServiceLiasonImpl::killCursorsWithMatchingSessions( +std::pair<Status, int> MockServiceLiasonImpl::killCursorsWithMatchingSessions( OperationContext* opCtx, const SessionKiller::Matcher& matcher) { _matcher = matcher; - return Status::OK(); + return std::make_pair(Status::OK(), 0); } } // namespace mongo diff --git a/src/mongo/db/service_liason_mock.h b/src/mongo/db/service_liason_mock.h index f7b6f0be535..19a7d8c321f 100644 --- a/src/mongo/db/service_liason_mock.h +++ b/src/mongo/db/service_liason_mock.h @@ -78,8 +78,8 @@ public: int jobs(); const KillAllSessionsByPattern* matchKilled(const LogicalSessionId& lsid); - Status killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher); + std::pair<Status, int> killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher); private: executor::AsyncTimerFactoryMock* _timerFactory; @@ -120,8 +120,8 @@ public: return _impl->join(); } - Status killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) { + std::pair<Status, int> killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher) override { return _impl->killCursorsWithMatchingSessions(opCtx, matcher); } diff --git a/src/mongo/db/service_liason_mongod.cpp b/src/mongo/db/service_liason_mongod.cpp index 8c4ad33a987..70e7ced614b 100644 --- a/src/mongo/db/service_liason_mongod.cpp +++ b/src/mongo/db/service_liason_mongod.cpp @@ -103,8 +103,8 @@ ServiceContext* ServiceLiasonMongod::_context() { return getGlobalServiceContext(); } -Status ServiceLiasonMongod::killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) { +std::pair<Status, int> ServiceLiasonMongod::killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher) { return CursorManager::getGlobalCursorManager()->killCursorsWithMatchingSessions(opCtx, matcher); } diff --git a/src/mongo/db/service_liason_mongod.h b/src/mongo/db/service_liason_mongod.h index 3feb502c437..562429efca5 100644 --- a/src/mongo/db/service_liason_mongod.h +++ b/src/mongo/db/service_liason_mongod.h @@ -59,8 +59,8 @@ public: Date_t now() const override; - Status killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) override; + std::pair<Status, int> killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher) override; protected: /** diff --git a/src/mongo/db/service_liason_mongos.cpp b/src/mongo/db/service_liason_mongos.cpp index 37c5420d28d..f972d87951f 100644 --- a/src/mongo/db/service_liason_mongos.cpp +++ b/src/mongo/db/service_liason_mongos.cpp @@ -76,8 +76,8 @@ ServiceContext* ServiceLiasonMongos::_context() { return getGlobalServiceContext(); } -Status ServiceLiasonMongos::killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) { +std::pair<Status, int> ServiceLiasonMongos::killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher) { auto cursorManager = Grid::get(getGlobalServiceContext())->getCursorManager(); return cursorManager->killCursorsWithMatchingSessions(opCtx, matcher); } diff --git a/src/mongo/db/service_liason_mongos.h b/src/mongo/db/service_liason_mongos.h index 26780f7b702..4ff26b8cb1c 100644 --- a/src/mongo/db/service_liason_mongos.h +++ b/src/mongo/db/service_liason_mongos.h @@ -59,8 +59,8 @@ public: Date_t now() const override; - Status killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher) override; + std::pair<Status, int> killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher) override; protected: /** diff --git a/src/mongo/db/transaction_reaper.cpp b/src/mongo/db/transaction_reaper.cpp index f9187cb9e82..a3b804a839d 100644 --- a/src/mongo/db/transaction_reaper.cpp +++ b/src/mongo/db/transaction_reaper.cpp @@ -40,6 +40,7 @@ #include "mongo/db/server_parameters.h" #include "mongo/db/session_txn_record_gen.h" #include "mongo/db/sessions_collection.h" +#include "mongo/platform/atomic_word.h" #include "mongo/s/catalog_cache.h" #include "mongo/s/client/shard.h" #include "mongo/s/client/shard_registry.h" @@ -104,7 +105,7 @@ public: TransactionReaperImpl(std::shared_ptr<SessionsCollection> collection) : _collection(std::move(collection)) {} - void reap(OperationContext* opCtx) override { + int reap(OperationContext* opCtx) override { Handler handler(opCtx, _collection.get()); Lock::DBLock lk(opCtx, SessionsCollection::kSessionsDb, MODE_IS); @@ -129,17 +130,21 @@ public: handler.handleLsid(transactionSession.get_id()); } } + + // Before the handler goes out of scope, flush its last batch to disk and collect stats. + return handler.finalize(); } private: std::shared_ptr<SessionsCollection> _collection; }; -void handleBatchHelper(SessionsCollection* sessionsCollection, - OperationContext* opCtx, - const LogicalSessionIdSet& batch) { +int handleBatchHelper(SessionsCollection* sessionsCollection, + OperationContext* opCtx, + const LogicalSessionIdSet& batch) { auto removed = uassertStatusOK(sessionsCollection->findRemovedSessions(opCtx, batch)); uassertStatusOK(sessionsCollection->removeTransactionRecords(opCtx, removed)); + return removed.size(); } /** @@ -148,25 +153,35 @@ void handleBatchHelper(SessionsCollection* sessionsCollection, class ReplHandler { public: ReplHandler(OperationContext* opCtx, SessionsCollection* collection) - : _opCtx(opCtx), _sessionsCollection(collection) {} + : _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {} ~ReplHandler() { - DESTRUCTOR_GUARD([&] { handleBatchHelper(_sessionsCollection, _opCtx, _batch); }()); + invariant(_finalized.load()); } void handleLsid(const LogicalSessionId& lsid) { _batch.insert(lsid); if (_batch.size() > write_ops::kMaxWriteBatchSize) { - handleBatchHelper(_sessionsCollection, _opCtx, _batch); + _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch); _batch.clear(); } } + int finalize() { + invariant(!_finalized.swap(true)); + _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, _batch); + return _numReaped; + } + private: OperationContext* _opCtx; SessionsCollection* _sessionsCollection; LogicalSessionIdSet _batch; + + int _numReaped; + + AtomicBool _finalized; }; /** @@ -176,14 +191,10 @@ private: class ShardedHandler { public: ShardedHandler(OperationContext* opCtx, SessionsCollection* collection) - : _opCtx(opCtx), _sessionsCollection(collection) {} + : _opCtx(opCtx), _sessionsCollection(collection), _numReaped(0), _finalized(false) {} ~ShardedHandler() { - DESTRUCTOR_GUARD([&] { - for (const auto& pair : _shards) { - handleBatchHelper(_sessionsCollection, _opCtx, pair.second); - } - }()); + invariant(_finalized.load()); } void handleLsid(const LogicalSessionId& lsid) { @@ -210,18 +221,31 @@ public: auto& lsids = _shards[shardId]; lsids.insert(lsid); if (lsids.size() > write_ops::kMaxWriteBatchSize) { - handleBatchHelper(_sessionsCollection, _opCtx, lsids); + _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, lsids); _shards.erase(shardId); } } + int finalize() { + invariant(!_finalized.swap(true)); + for (const auto& pair : _shards) { + _numReaped += handleBatchHelper(_sessionsCollection, _opCtx, pair.second); + } + + return _numReaped; + } + private: OperationContext* _opCtx; SessionsCollection* _sessionsCollection; std::shared_ptr<ChunkManager> _cm; std::shared_ptr<Shard> _primary; + int _numReaped; + stdx::unordered_map<ShardId, LogicalSessionIdSet, ShardId::Hasher> _shards; + + AtomicBool _finalized; }; } // namespace diff --git a/src/mongo/db/transaction_reaper.h b/src/mongo/db/transaction_reaper.h index fdec1083916..16faf13a6b6 100644 --- a/src/mongo/db/transaction_reaper.h +++ b/src/mongo/db/transaction_reaper.h @@ -49,7 +49,7 @@ public: virtual ~TransactionReaper() = 0; - virtual void reap(OperationContext* OperationContext) = 0; + virtual int reap(OperationContext* OperationContext) = 0; /** * The implementation of the sessions collections is different in replica sets versus sharded diff --git a/src/mongo/s/commands/kill_sessions_remote.cpp b/src/mongo/s/commands/kill_sessions_remote.cpp index a89e407eb45..78db3419f10 100644 --- a/src/mongo/s/commands/kill_sessions_remote.cpp +++ b/src/mongo/s/commands/kill_sessions_remote.cpp @@ -110,7 +110,10 @@ SessionKiller::Result parallelExec(OperationContext* opCtx, Status killSessionsRemoteKillCursor(OperationContext* opCtx, const SessionKiller::Matcher& matcher) { - return Grid::get(opCtx)->getCursorManager()->killCursorsWithMatchingSessions(opCtx, matcher); + return Grid::get(opCtx) + ->getCursorManager() + ->killCursorsWithMatchingSessions(opCtx, matcher) + .first; } } // namespace diff --git a/src/mongo/s/query/cluster_cursor_manager.cpp b/src/mongo/s/query/cluster_cursor_manager.cpp index 42ed57e83aa..ed34635a8fc 100644 --- a/src/mongo/s/query/cluster_cursor_manager.cpp +++ b/src/mongo/s/query/cluster_cursor_manager.cpp @@ -520,7 +520,7 @@ std::vector<GenericCursor> ClusterCursorManager::getAllCursors() const { return cursors; } -Status ClusterCursorManager::killCursorsWithMatchingSessions( +std::pair<Status, int> ClusterCursorManager::killCursorsWithMatchingSessions( OperationContext* opCtx, const SessionKiller::Matcher& matcher) { auto eraser = [&](ClusterCursorManager& mgr, CursorId id) { uassertStatusOK(mgr.killCursor(getNamespaceForCursorId(id).get(), id)); @@ -528,7 +528,7 @@ Status ClusterCursorManager::killCursorsWithMatchingSessions( auto visitor = makeKillSessionsCursorManagerVisitor(opCtx, matcher, std::move(eraser)); visitor(*this); - return visitor.getStatus(); + return std::make_pair(visitor.getStatus(), visitor.getCursorsKilled()); } stdx::unordered_set<CursorId> ClusterCursorManager::getCursorsForSession( diff --git a/src/mongo/s/query/cluster_cursor_manager.h b/src/mongo/s/query/cluster_cursor_manager.h index d3392c9a0c0..6bd9e20c3a6 100644 --- a/src/mongo/s/query/cluster_cursor_manager.h +++ b/src/mongo/s/query/cluster_cursor_manager.h @@ -29,6 +29,7 @@ #pragma once #include <memory> +#include <utility> #include <vector> #include "mongo/db/cursor_id.h" @@ -369,8 +370,8 @@ public: */ std::vector<GenericCursor> getAllCursors() const; - Status killCursorsWithMatchingSessions(OperationContext* opCtx, - const SessionKiller::Matcher& matcher); + std::pair<Status, int> killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher); /** * Returns a list of all open cursors for the given session. |