diff options
author | Gabriel Russell <gabriel.russell@mongodb.com> | 2017-09-05 12:14:27 -0400 |
---|---|---|
committer | Gabriel Russell <gabriel.russell@mongodb.com> | 2017-09-21 15:29:58 -0400 |
commit | 8beb002326a5fdd82c694497b7ebcb52a593a3d3 (patch) | |
tree | 5fefa9a1b037dedab30a81abc6bfa3193b564df2 | |
parent | daefad8112937c847282a661392179b9afab0b87 (diff) | |
download | mongo-8beb002326a5fdd82c694497b7ebcb52a593a3d3.tar.gz |
SERVER-28336 endSessions command
33 files changed, 636 insertions, 370 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js index b56d61fe7a7..6cee9851545 100644 --- a/jstests/core/views/views_all_commands.js +++ b/jstests/core/views/views_all_commands.js @@ -204,6 +204,7 @@ expectFailure: true, }, enableSharding: {skip: "Tested as part of shardCollection"}, + endSessions: {skip: isUnrelated}, eval: {skip: isUnrelated}, explain: {command: {explain: {count: "view"}}}, features: {skip: isUnrelated}, @@ -374,6 +375,10 @@ {command: {planCacheListQueryShapes: "view"}, expectFailure: true}, planCacheSetFilter: {command: {planCacheSetFilter: "view"}, expectFailure: true}, profile: {skip: isUnrelated}, + refreshLogicalSessionCacheNow: {skip: isAnInternalCommand}, + reapLogicalSessionCacheNow: {skip: isAnInternalCommand}, + refreshSessions: {skip: isUnrelated}, + refreshSessionsInternal: {skip: isAnInternalCommand}, reIndex: {command: {reIndex: "view"}, expectFailure: true}, removeShard: {skip: isUnrelated}, removeShardFromZone: {skip: isUnrelated}, @@ -482,10 +487,6 @@ }, stageDebug: {skip: isAnInternalCommand}, startSession: {skip: isAnInternalCommand}, - reapLogicalSessionCacheNow: {skip: isAnInternalCommand}, - refreshLogicalSessionCacheNow: {skip: isAnInternalCommand}, - refreshSessions: {skip: isUnrelated}, - refreshSessionsInternal: {skip: isAnInternalCommand}, top: {skip: "tested in views/views_stats.js"}, touch: { command: {touch: "view", data: true}, diff --git a/jstests/noPassthrough/end_sessions_command.js b/jstests/noPassthrough/end_sessions_command.js new file mode 100644 index 00000000000..3d6ae307a52 --- /dev/null +++ b/jstests/noPassthrough/end_sessions_command.js @@ -0,0 +1,70 @@ +(function() { + "use script"; + + var res; + var refresh = {refreshLogicalSessionCacheNow: 1}; + var startSession = {startSession: 1}; + + // Start up a standalone server. + var conn = MongoRunner.runMongod({nojournal: ""}); + var admin = conn.getDB("admin"); + + // Trigger an initial refresh, as a sanity check. + res = admin.runCommand(refresh); + assert.commandWorked(res, "failed to refresh"); + + var sessions = []; + for (var i = 0; i < 20; i++) { + res = admin.runCommand(startSession); + assert.commandWorked(res, "unable to start session"); + sessions.push(res); + } + + res = admin.runCommand(refresh); + assert.commandWorked(res, "failed to refresh"); + + assert.eq(admin.system.sessions.count(), 20, "refresh should have written 20 session records"); + + var endSessionsIds = []; + for (var i = 0; i < 10; i++) { + endSessionsIds.push(sessions[i].id); + } + res = admin.runCommand({endSessions: endSessionsIds}); + assert.commandWorked(res, "failed to end sessions"); + + res = admin.runCommand(refresh); + assert.commandWorked(res, "failed to refresh"); + + assert.eq(admin.system.sessions.count(), + 10, + "endSessions and refresh should result in 10 remaining sessions"); + + // double delete the remaining 10 + endSessionsIds = []; + for (var i = 10; i < 20; i++) { + endSessionsIds.push(sessions[i].id); + endSessionsIds.push(sessions[i].id); + } + + res = admin.runCommand({endSessions: endSessionsIds}); + assert.commandWorked(res, "failed to end sessions"); + + res = admin.runCommand(refresh); + assert.commandWorked(res, "failed to refresh"); + + assert.eq(admin.system.sessions.count(), + 0, + "endSessions and refresh should result in 0 remaining sessions"); + + // delete some sessions that were never created + res = admin.runCommand({ + endSessions: [ + {"id": UUID("bacb219c-214c-47f9-a94a-6c7f434b3bae")}, + {"id": UUID("bacb219c-214c-47f9-a94a-6c7f434b3baf")} + ] + }); + + res = admin.runCommand(refresh); + assert.commandWorked(res, "failed to refresh"); + +}()); diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js index 5b49fd1dc24..e8a285456f8 100644 --- a/jstests/sharding/safe_secondary_reads_drop_recreate.js +++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js @@ -155,6 +155,7 @@ dropUser: {skip: "primary only"}, emptycapped: {skip: "primary only"}, enableSharding: {skip: "primary only"}, + endSessions: {skip: "does not return user data"}, eval: {skip: "primary only"}, explain: {skip: "TODO SERVER-30068"}, features: {skip: "does not return user data"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js index 44ac466c435..64a7a674cf7 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js @@ -179,6 +179,7 @@ dropUser: {skip: "primary only"}, emptycapped: {skip: "primary only"}, enableSharding: {skip: "primary only"}, + endSessions: {skip: "does not return user data"}, eval: {skip: "primary only"}, explain: {skip: "TODO SERVER-30068"}, features: {skip: "does not return user data"}, diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js index bf09aebe9f5..4e728ba4f9a 100644 --- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js +++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js @@ -157,6 +157,7 @@ dropUser: {skip: "primary only"}, emptycapped: {skip: "primary only"}, enableSharding: {skip: "primary only"}, + endSessions: {skip: "does not return user data"}, eval: {skip: "primary only"}, explain: {skip: "TODO SERVER-30068"}, features: {skip: "does not return user data"}, diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 70151071778..22074f2f637 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -949,11 +949,12 @@ envWithAsio.Library( LIBDEPS=[ '$BUILD_DIR/mongo/executor/async_timer_mock', '$BUILD_DIR/mongo/util/periodic_runner_asio', + 'kill_sessions', 'service_liason', ], ) -env.Library( +envWithAsio.Library( target='service_liason_mongod', source=[ 'service_liason_mongod.cpp', @@ -1072,6 +1073,7 @@ env.Library( 'initialize_operation_session_info.cpp', 'logical_session_cache_impl.cpp', 'logical_session_server_status_section.cpp', + env.Idlc('commands/end_sessions.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/db/commands/server_status', @@ -1081,6 +1083,7 @@ env.Library( 'sessions_collection', 'server_parameters', 'service_liason', + 'kill_sessions', ], ) @@ -1109,6 +1112,7 @@ envWithAsio.CppUnitTest( LIBDEPS=[ '$BUILD_DIR/mongo/db/service_context_noop_init', '$BUILD_DIR/mongo/executor/async_timer_mock', + 'auth/authmocks', 'keys_collection_manager', 'keys_collection_document', 'logical_clock', diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 99cb82a1298..31f8600cd16 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -57,6 +57,7 @@ env.Library( "conn_pool_sync.cpp", "connection_status.cpp", "copydb_common.cpp", + "end_sessions_command.cpp", "fail_point_cmd.cpp", "feature_compatibility_version_command_parser.cpp", "find_and_modify_common.cpp", @@ -93,6 +94,7 @@ env.Library( '$BUILD_DIR/mongo/db/lasterror', '$BUILD_DIR/mongo/db/log_process_details', '$BUILD_DIR/mongo/db/logical_session_cache', + '$BUILD_DIR/mongo/db/logical_session_cache_impl', '$BUILD_DIR/mongo/db/logical_session_id', '$BUILD_DIR/mongo/db/logical_session_id_helpers', '$BUILD_DIR/mongo/db/matcher/expressions', diff --git a/src/mongo/db/commands/end_sessions.idl b/src/mongo/db/commands/end_sessions.idl new file mode 100644 index 00000000000..877eddd72f4 --- /dev/null +++ b/src/mongo/db/commands/end_sessions.idl @@ -0,0 +1,28 @@ +# Copyright (C) 2017 MongoDB Inc. +# +# This program is free software: you can redistribute it and/or modify +# it under the terms of the GNU Affero General Public License, version 3, +# as published by the Free Software Foundation. +# +# This program is distributed in the hope that it will be useful, +# but WITHOUT ANY WARRANTY; without even the implied warranty of +# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +# GNU Affero General Public License for more details. +# +# You should have received a copy of the GNU Affero General Public License +# along with this program. If not, see <http://www.gnu.org/licenses/>. +# + +global: + cpp_namespace: "mongo" + +imports: + - "mongo/db/logical_session_id.idl" + +structs: + + EndSessionsCmdFromClient: + description: "A struct representing an endSessions command from a client" + strict: false + fields: + endSessions: array<LogicalSessionFromClient> diff --git a/src/mongo/db/commands/end_sessions_command.cpp b/src/mongo/db/commands/end_sessions_command.cpp new file mode 100644 index 00000000000..6a07059f472 --- /dev/null +++ b/src/mongo/db/commands/end_sessions_command.cpp @@ -0,0 +1,89 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/base/init.h" +#include "mongo/db/auth/authorization_session.h" +#include "mongo/db/client.h" +#include "mongo/db/commands.h" +#include "mongo/db/logical_session_cache.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/operation_context.h" + +namespace mongo { + +class EndSessionsCommand final : public BasicCommand { + MONGO_DISALLOW_COPYING(EndSessionsCommand); + +public: + EndSessionsCommand() : BasicCommand("endSessions") {} + + bool slaveOk() const override { + return true; + } + bool adminOnly() const override { + return false; + } + bool supportsWriteConcern(const BSONObj& cmd) const override { + return false; + } + void help(std::stringstream& help) const override { + help << "end a set of logical sessions"; + } + Status checkAuthForOperation(OperationContext* opCtx, + const std::string& dbname, + const BSONObj& cmdObj) override { + + // It is always ok to run this command, as long as you are authenticated + // as some user, if auth is enabled. + AuthorizationSession* authSession = AuthorizationSession::get(opCtx->getClient()); + try { + auto user = authSession->getSingleUser(); + invariant(user); + return Status::OK(); + } catch (...) { + return exceptionToStatus(); + } + } + + virtual bool run(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + + auto lsCache = LogicalSessionCache::get(opCtx); + + auto cmd = EndSessionsCmdFromClient::parse("EndSessionsCmdFromClient"_sd, cmdObj); + + lsCache->endSessions(makeLogicalSessionIds(cmd.getEndSessions(), opCtx)); + return true; + } +} endSessionsCommand; + +} // namespace mongo diff --git a/src/mongo/db/commands/start_session_command.cpp b/src/mongo/db/commands/start_session_command.cpp index a8ae4219a34..63fe696057d 100644 --- a/src/mongo/db/commands/start_session_command.cpp +++ b/src/mongo/db/commands/start_session_command.cpp @@ -90,11 +90,7 @@ public: return appendCommandStatus(result, status); } - Status startSessionStatus = lsCache->startSession(opCtx, record.get()); - - if (!startSessionStatus.isOK()) { - return appendCommandStatus(result, startSessionStatus); - } + lsCache->startSession(opCtx, record.get()); makeLogicalSessionToClient(record->getId()).serialize(&result); diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h index 3a1ce97ef39..f9ff378fc72 100644 --- a/src/mongo/db/logical_session_cache.h +++ b/src/mongo/db/logical_session_cache.h @@ -31,6 +31,7 @@ #include <boost/optional.hpp> #include "mongo/base/status.h" +#include "mongo/db/commands/end_sessions_gen.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/refresh_sessions_gen.h" @@ -67,7 +68,7 @@ public: * should only be used when starting new sessions and should not be used to * insert records for existing sessions. */ - virtual Status startSession(OperationContext* opCtx, LogicalSessionRecord record) = 0; + virtual void startSession(OperationContext* opCtx, LogicalSessionRecord record) = 0; /** * Refresh the given sessions. Updates the timestamps of these records in @@ -85,6 +86,11 @@ public: virtual void vivify(OperationContext* opCtx, const LogicalSessionId& lsid) = 0; /** + * enqueues LogicalSessionIds for removal during the next _refresh() + */ + virtual void endSessions(const LogicalSessionIdSet& lsids) = 0; + + /** * Removes all local records in this cache. Does not remove the corresponding * authoritative session records from the sessions collection. */ diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index ffafcccbd4f..624934c2785 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -41,13 +41,10 @@ #include "mongo/util/duration.h" #include "mongo/util/log.h" #include "mongo/util/periodic_runner.h" +#include "mongo/util/scopeguard.h" namespace mongo { -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRecordCacheSize, - int, - LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity); - MONGO_EXPORT_STARTUP_SERVER_PARAMETER( logicalSessionRefreshMinutes, int, @@ -55,7 +52,6 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER( MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, false); -constexpr int LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity; constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh; LogicalSessionCacheImpl::LogicalSessionCacheImpl( @@ -67,8 +63,7 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl( _sessionTimeout(options.sessionTimeout), _service(std::move(service)), _sessionsColl(std::move(collection)), - _transactionReaper(std::move(transactionReaper)), - _cache(options.capacity) { + _transactionReaper(std::move(transactionReaper)) { if (!disableLogicalSessionCacheRefresh) { _service->scheduleJob( {[this](Client* client) { _periodicRefresh(client); }, _refreshInterval}); @@ -88,30 +83,27 @@ LogicalSessionCacheImpl::~LogicalSessionCacheImpl() { } Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) { - stdx::unique_lock<stdx::mutex> lk(_cacheMutex); - auto it = _cache.find(lsid); - if (it == _cache.end()) { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + auto it = _activeSessions.find(lsid); + if (it == _activeSessions.end()) { return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; } - // Update the last use time before returning. - it->second.setLastUse(now()); return Status::OK(); } -Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) { +void LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) { // Add the new record to our local cache. We will insert it into the sessions collection // the next time _refresh is called. If there is already a record in the cache for this // session, we'll just write over it with our newer, more recent one. _addToCache(record); - return Status::OK(); } Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, const RefreshSessionsCmdFromClient& cmd) { // Update the timestamps of all these records in our cache. auto sessions = makeLogicalSessionIds(cmd.getRefreshSessions(), opCtx); - for (auto& lsid : sessions) { + for (const auto& lsid : sessions) { if (!promote(lsid).isOK()) { // This is a new record, insert it. _addToCache(makeLogicalSessionRecord(opCtx, lsid, now())); @@ -127,7 +119,7 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, // Update the timestamps of all these records in our cache. auto records = cmd.getRefreshSessionsInternal(); - for (auto& record : records) { + for (const auto& record : records) { if (!promote(record.getId()).isOK()) { // This is a new record, insert it. _addToCache(record); @@ -136,17 +128,22 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, } // Write to the sessions collection now. - return _sessionsColl->refreshSessions(opCtx, toRefresh, now()); + return _sessionsColl->refreshSessions(opCtx, toRefresh); } void LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) { if (!promote(lsid).isOK()) { - startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now())).ignore(); + startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now())); } } Status LogicalSessionCacheImpl::refreshNow(Client* client) { - return _refresh(client); + try { + _refresh(client); + } catch (...) { + return exceptionToStatus(); + } + return Status::OK(); } Status LogicalSessionCacheImpl::reapNow(Client* client) { @@ -159,16 +156,15 @@ Date_t LogicalSessionCacheImpl::now() { size_t LogicalSessionCacheImpl::size() { stdx::lock_guard<stdx::mutex> lock(_cacheMutex); - return _cache.size(); + return _activeSessions.size(); } void LogicalSessionCacheImpl::_periodicRefresh(Client* client) { - auto res = _refresh(client); - if (!res.isOK()) { - log() << "Failed to refresh session cache: " << res; + try { + _refresh(client); + } catch (...) { + log() << "Failed to refresh session cache: " << exceptionToStatus(); } - - return; } void LogicalSessionCacheImpl::_periodicReap(Client* client) { @@ -204,115 +200,123 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { return Status::OK(); } -Status LogicalSessionCacheImpl::_refresh(Client* client) { - LogicalSessionRecordSet activeSessions; - LogicalSessionRecordSet deadSessions; - - auto time = now(); - - // We should avoid situations where we have records in the cache - // that have been expired from the sessions collection. If they haven't been - // used in _sessionTimeout, we should just remove them. +void LogicalSessionCacheImpl::_refresh(Client* client) { + LogicalSessionIdSet staleSessions; + LogicalSessionIdSet explicitlyEndingSessions; + LogicalSessionIdMap<LogicalSessionRecord> activeSessions; + + // backSwapper creates a guard that in the case of a exception + // replaces the ending or active sessions that swapped out of of LogicalSessionCache, + // and merges in any records that had been added since we swapped them + // out. + auto backSwapper = [this](auto& member, auto& temp) { + return MakeGuard([this, &member, &temp] { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + using std::swap; + swap(member, temp); + for (const auto& it : temp) { + member.emplace(it); + } + }); + }; - // Assemble a list of active session records in our cache - std::vector<decltype(_cache)::ListEntry> cacheCopy; { - stdx::unique_lock<stdx::mutex> lk(_cacheMutex); - cacheCopy.assign(_cache.begin(), _cache.end()); + using std::swap; + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + swap(explicitlyEndingSessions, _endingSessions); + swap(activeSessions, _activeSessions); } + auto activeSessionsBackSwapper = backSwapper(_activeSessions, activeSessions); + auto explicitlyEndingBackSwaper = backSwapper(_endingSessions, explicitlyEndingSessions); - for (auto& it : cacheCopy) { - auto record = it.second; - if (!_isDead(record, time)) { - activeSessions.insert(record); - } else { - deadSessions.insert(record); + // get or make an opCtx + + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto* const opCtx = [&client, &uniqueCtx] { + if (client->getOperationContext()) { + return client->getOperationContext(); } + + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); + + // remove all explicitlyEndingSessions from activeSessions + for (const auto& lsid : explicitlyEndingSessions) { + activeSessions.erase(lsid); } - // Append any active sessions from the service. We should always have - // cache entries for active sessions. If we don't, then it is a sign that - // the cache needs to be larger, because active session records are being - // evicted. - - // Promote our cached entries for all active service sessions to be recently- - // used, and update their lastUse dates so we don't lose them to eviction. We - // do not need to do this with records from our own cache, which are being used - // regularly. Sessions for long-running queries, however, must be kept alive - // by us here. - auto serviceSessions = _service->getActiveSessions(); - { - stdx::unique_lock<stdx::mutex> lk(_cacheMutex); - for (auto lsid : serviceSessions) { - auto it = _cache.promote(lsid); - if (it != _cache.end()) { - // If we have not found our record, it may have been removed - // by another thread. - it->second.setLastUse(time); - activeSessions.insert(it->second); - } + // refresh all recently active sessions as well as for sessions attached to running ops - // TODO SERVER-29709: Rethink how active sessions interact with refreshes, - // and potentially move this block above the block where we separate - // dead sessions from live sessions, above. - activeSessions.insert(makeLogicalSessionRecord(lsid, time)); + LogicalSessionRecordSet activeSessionRecords{}; + + auto runningOpSessions = _service->getActiveOpSessions(); + + for (const auto& it : runningOpSessions) { + // if a running op is the cause of an upsert, we won't have a user name for the record + if (explicitlyEndingSessions.count(it) > 0) { + continue; } + LogicalSessionRecord lsr; + lsr.setId(it); + activeSessionRecords.insert(lsr); + } + for (const auto& it : activeSessions) { + activeSessionRecords.insert(it.second); } + // refresh the active sessions in the sessions collection + uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords)); + activeSessionsBackSwapper.Dismiss(); - // Query into the sessions collection to do the refresh. If any sessions have - // failed to refresh, it means their authoritative records were removed, and - // we should remove such records from our cache as well. - { - boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; - auto* const opCtx = [&client, &uniqueCtx] { - if (client->getOperationContext()) { - return client->getOperationContext(); - } + // remove the ending sessions from the sessions collection + uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions)); + explicitlyEndingBackSwaper.Dismiss(); - uniqueCtx.emplace(client->makeOperationContext()); - return uniqueCtx->get(); - }(); + // Find which running, but not recently active sessions, are expired, and add them + // to the list of sessions to kill cursors for + + KillAllSessionsByPatternSet patterns; + + auto openCursorSessions = _service->getOpenCursorSessions(); + // think about pruning ending and active out of openCursorSessions + auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions); - auto res = _sessionsColl->refreshSessions(opCtx, std::move(activeSessions), time); - if (!res.isOK()) { - // TODO SERVER-29709: handle network errors here. - return res; + if (statusAndRemovedSessions.isOK()) { + auto removedSessions = statusAndRemovedSessions.getValue(); + for (const auto& lsid : removedSessions) { + patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); } } - // Prune any dead records out of the cache. Dead records are ones that failed to - // refresh, or ones that have expired locally. We don't make an effort to check - // if the locally-expired records still have live authoritative records in the - // sessions collection. We also don't attempt to resurrect our expired records. - // However, we *do* keep records alive if they are active on the service. - { - // TODO SERVER-29709: handle expiration separately from failure to refresh. - } + // Add all of the explicitly ended sessions to the list of sessions to kill cursors for - return Status::OK(); + for (const auto& lsid : explicitlyEndingSessions) { + patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); + } + SessionKiller::Matcher matcher(std::move(patterns)); + _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)).ignore(); } - void LogicalSessionCacheImpl::clear() { // TODO: What should this do? Wasn't implemented before MONGO_UNREACHABLE; } -bool LogicalSessionCacheImpl::_isDead(const LogicalSessionRecord& record, Date_t now) const { - return record.getLastUse() + _sessionTimeout < now; +void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + _endingSessions.insert(begin(sessions), end(sessions)); } -boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::_addToCache( - LogicalSessionRecord record) { - stdx::unique_lock<stdx::mutex> lk(_cacheMutex); - return _cache.add(record.getId(), std::move(record)); +void LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) { + stdx::lock_guard<stdx::mutex> lk(_cacheMutex); + _activeSessions.insert(std::make_pair(record.getId(), record)); } std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const { stdx::lock_guard<stdx::mutex> lk(_cacheMutex); std::vector<LogicalSessionId> ret; - ret.reserve(_cache.size()); - for (const auto& id : _cache) { + ret.reserve(_activeSessions.size()); + for (const auto& id : _activeSessions) { ret.push_back(id.first); } return ret; @@ -322,7 +326,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( const std::vector<SHA256Block>& userDigests) const { stdx::lock_guard<stdx::mutex> lk(_cacheMutex); std::vector<LogicalSessionId> ret; - for (const auto& it : _cache) { + for (const auto& it : _activeSessions) { if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) != userDigests.cend()) { ret.push_back(it.first); @@ -334,8 +338,8 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached( const LogicalSessionId& id) const { stdx::lock_guard<stdx::mutex> lk(_cacheMutex); - const auto it = _cache.cfind(id); - if (it == _cache.cend()) { + const auto it = _activeSessions.find(id); + if (it == _activeSessions.end()) { return boost::none; } return it->second; diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h index 688e04a8a37..3c8557166d5 100644 --- a/src/mongo/db/logical_session_cache_impl.h +++ b/src/mongo/db/logical_session_cache_impl.h @@ -45,7 +45,6 @@ class Client; class OperationContext; class ServiceContext; -extern int logicalSessionRecordCacheSize; extern int logicalSessionRefreshMinutes; /** @@ -56,7 +55,6 @@ extern int logicalSessionRefreshMinutes; */ class LogicalSessionCacheImpl final : public LogicalSessionCache { public: - static constexpr int kLogicalSessionCacheDefaultCapacity = 10000; static constexpr Minutes kLogicalSessionDefaultRefresh = Minutes(5); /** @@ -66,13 +64,6 @@ public: Options(){}; /** - * The number of session records to keep in the cache. - * - * May be set with --setParameter logicalSessionRecordCacheSize=X. - */ - int capacity = logicalSessionRecordCacheSize; - - /** * A timeout value to use for sessions in the cache, in minutes. * * By default, this is set to 30 minutes. @@ -108,7 +99,7 @@ public: Status promote(LogicalSessionId lsid) override; - Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override; + void startSession(OperationContext* opCtx, LogicalSessionRecord record) override; Status refreshSessions(OperationContext* opCtx, const RefreshSessionsCmdFromClient& cmd) override; @@ -134,13 +125,15 @@ public: boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const override; + void endSessions(const LogicalSessionIdSet& sessions) override; + private: /** * Internal methods to handle scheduling and perform refreshes for active * session records contained within the cache. */ void _periodicRefresh(Client* client); - Status _refresh(Client* client); + void _refresh(Client* client); void _periodicReap(Client* client); Status _reap(Client* client); @@ -153,7 +146,7 @@ private: /** * Takes the lock and inserts the given record into the cache. */ - boost::optional<LogicalSessionRecord> _addToCache(LogicalSessionRecord record); + void _addToCache(LogicalSessionRecord record); const Minutes _refreshInterval; const Minutes _sessionTimeout; @@ -165,7 +158,12 @@ private: std::shared_ptr<TransactionReaper> _transactionReaper; mutable stdx::mutex _cacheMutex; - LRUCache<LogicalSessionId, LogicalSessionRecord, LogicalSessionIdHash> _cache; + + LogicalSessionIdMap<LogicalSessionRecord> _activeSessions; + + LogicalSessionIdSet _endingSessions; + + Date_t lastRefreshTime; }; } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h index 077efa741b9..ad041d74ca4 100644 --- a/src/mongo/db/logical_session_cache_noop.h +++ b/src/mongo/db/logical_session_cache_noop.h @@ -45,9 +45,7 @@ public: return Status::OK(); } - Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override { - return Status::OK(); - } + void startSession(OperationContext* opCtx, LogicalSessionRecord record) override {} Status refreshSessions(OperationContext* opCtx, const RefreshSessionsCmdFromClient& cmd) override { @@ -90,6 +88,8 @@ public: boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const override { return boost::none; } + + void endSessions(const LogicalSessionIdSet& lsids) override {} }; } // namespace mongo diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp index cb8a226dea7..9001ab59a9a 100644 --- a/src/mongo/db/logical_session_cache_test.cpp +++ b/src/mongo/db/logical_session_cache_test.cpp @@ -28,6 +28,11 @@ #include "mongo/platform/basic.h" +#include "mongo/db/auth/authorization_manager.h" +#include "mongo/db/auth/authorization_session_for_test.h" +#include "mongo/db/auth/authz_manager_external_state_mock.h" +#include "mongo/db/auth/authz_session_external_state_mock.h" + #include "mongo/bson/oid.h" #include "mongo/db/auth/user_name.h" #include "mongo/db/logical_session_cache.h" @@ -62,6 +67,12 @@ public: _sessions(std::make_shared<MockSessionsCollectionImpl>()) {} void setUp() override { + auto localManagerState = stdx::make_unique<AuthzManagerExternalStateMock>(); + localManagerState.get()->setAuthzVersion(AuthorizationManager::schemaVersion28SCRAM); + auto uniqueAuthzManager = + stdx::make_unique<AuthorizationManager>(std::move(localManagerState)); + AuthorizationManager::set(&serviceContext, std::move(uniqueAuthzManager)); + auto client = serviceContext.makeClient("testClient"); _opCtx = client->makeOperationContext(); _client = client.get(); @@ -149,7 +160,7 @@ TEST_F(LogicalSessionCacheTest, PromoteUpdatesLastUse) { auto start = service()->now(); // Insert the record into the sessions collection with 'start' - ASSERT(cache()->startSession(opCtx(), makeLogicalSessionRecord(lsid, start)).isOK()); + cache()->startSession(opCtx(), makeLogicalSessionRecord(lsid, start)); // Fast forward time and promote service()->fastForward(Milliseconds(500)); @@ -189,8 +200,7 @@ TEST_F(LogicalSessionCacheTest, StartSession) { auto lsid = record.getId(); // Test starting a new session - auto res = cache()->startSession(opCtx(), record); - ASSERT(res.isOK()); + cache()->startSession(opCtx(), record); // Record will not be in the collection yet; refresh must happen first. ASSERT(!sessions()->has(lsid)); @@ -201,81 +211,27 @@ TEST_F(LogicalSessionCacheTest, StartSession) { ASSERT(sessions()->has(lsid)); // Try to start the same session again, should succeed. - res = cache()->startSession(opCtx(), record); - ASSERT(res.isOK()); + cache()->startSession(opCtx(), record); // Try to start a session that is already in the sessions collection but // is not in our local cache, should succeed. auto record2 = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now()); sessions()->add(record2); - res = cache()->startSession(opCtx(), record2); - ASSERT(res.isOK()); + cache()->startSession(opCtx(), record2); // Try to start a session that has expired from our cache, and is no // longer in the sessions collection, should succeed service()->fastForward(Milliseconds(kSessionTimeout.count() + 5)); sessions()->remove(lsid); ASSERT(!sessions()->has(lsid)); - res = cache()->startSession(opCtx(), record); - ASSERT(res.isOK()); -} - -// Test that records in the cache are properly refreshed until they expire -TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) { - // Insert two records into the cache - auto record1 = makeLogicalSessionRecordForTest(); - auto record2 = makeLogicalSessionRecordForTest(); - cache()->startSession(opCtx(), record1).transitional_ignore(); - cache()->startSession(opCtx(), record2).transitional_ignore(); - - stdx::promise<int> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); - - // Advance time to first refresh point, check that refresh happens, and - // that it includes both our records - sessions()->setRefreshHook([&hitRefresh](const LogicalSessionRecordSet& sessions) { - hitRefresh.set_value(sessions.size()); - return Status::OK(); - }); - - // Wait for the refresh to happen - clearOpCtx(); - service()->fastForward(kForceRefresh); - ASSERT(cache()->refreshNow(client()).isOK()); - refreshFuture.wait(); - ASSERT_EQ(refreshFuture.get(), 2); - - sessions()->clearHooks(); - - stdx::promise<LogicalSessionId> refresh2; - auto refresh2Future = refresh2.get_future(); - - // Use one of the records - setOpCtx(); - auto res = cache()->promote(record1.getId()); - ASSERT(res.isOK()); - - // Advance time so that one record expires - // Ensure that first record was refreshed, and second was thrown away - sessions()->setRefreshHook([&refresh2](const LogicalSessionRecordSet& sessions) { - // We should only have one record here, the other should have expired - ASSERT_EQ(sessions.size(), size_t(1)); - refresh2.set_value(sessions.begin()->getId()); - return Status::OK(); - }); - - clearOpCtx(); - service()->fastForward(kSessionTimeout - kForceRefresh + Milliseconds(1)); - ASSERT(cache()->refreshNow(client()).isOK()); - refresh2Future.wait(); - ASSERT_EQ(refresh2Future.get(), record1.getId()); + cache()->startSession(opCtx(), record); } // Test that session cache properly expires lsids after 30 minutes of no use TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) { // Insert a lsid auto record = makeLogicalSessionRecordForTest(); - cache()->startSession(opCtx(), record).transitional_ignore(); + cache()->startSession(opCtx(), record); auto res = cache()->promote(record.getId()); ASSERT(res.isOK()); @@ -289,66 +245,12 @@ TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) { // ASSERT(!res.isOK()); } -// Test that we keep refreshing sessions that are active on the service -TEST_F(LogicalSessionCacheTest, LongRunningQueriesAreRefreshed) { - auto lsid = makeLogicalSessionIdForTest(); - - // Insert one active lsid on the service, none in the cache - service()->add(lsid); - - int count = 0; - - sessions()->setRefreshHook([&count, &lsid](const LogicalSessionRecordSet& sessions) { - ASSERT_EQ(sessions.size(), size_t(1)); - ASSERT_EQ(sessions.begin()->getId(), lsid); - count++; - return Status::OK(); - }); - - clearOpCtx(); - - // Force a refresh, it should refresh our active session - service()->fastForward(kForceRefresh); - ASSERT(cache()->refreshNow(client()).isOK()); - ASSERT_EQ(count, 1); - - // Force a session timeout, session is still on the service - service()->fastForward(kSessionTimeout); - ASSERT(cache()->refreshNow(client()).isOK()); - ASSERT_EQ(count, 2); - - // Force another refresh, check that it refreshes that active lsid again - service()->fastForward(kForceRefresh); - ASSERT(cache()->refreshNow(client()).isOK()); - ASSERT_EQ(count, 3); -} - -// Test that the set of lsids we refresh is a sum of cached + active lsids -TEST_F(LogicalSessionCacheTest, RefreshCachedAndServiceSignedLsidsTogether) { - // Put one session into the cache, one into the service - auto lsid1 = makeLogicalSessionIdForTest(); - service()->add(lsid1); - auto record2 = makeLogicalSessionRecordForTest(); - cache()->startSession(opCtx(), record2).transitional_ignore(); - - // Both signedLsids refresh - sessions()->setRefreshHook([](const LogicalSessionRecordSet& sessions) { - ASSERT_EQ(sessions.size(), size_t(2)); - return Status::OK(); - }); - - // Force a refresh - clearOpCtx(); - service()->fastForward(kForceRefresh); - ASSERT(cache()->refreshNow(client()).isOK()); -} - // Test large sets of cache-only session lsids TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) { - int count = LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity; + int count = 10000; for (int i = 0; i < count; i++) { auto record = makeLogicalSessionRecordForTest(); - cache()->startSession(opCtx(), record).transitional_ignore(); + cache()->startSession(opCtx(), record); } // Check that all signedLsids refresh @@ -363,72 +265,140 @@ TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) { ASSERT(cache()->refreshNow(client()).isOK()); } -// Test larger sets of service-only session lsids -TEST_F(LogicalSessionCacheTest, ManyLongRunningSessionsRefresh) { - int count = LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity; - for (int i = 0; i < count; i++) { - auto lsid = makeLogicalSessionIdForTest(); - service()->add(lsid); - } - - // Check that all signedLsids refresh - sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) { - ASSERT_EQ(sessions.size(), size_t(count)); - return Status::OK(); - }); - - // Force a refresh - clearOpCtx(); - service()->fastForward(kForceRefresh); - ASSERT(cache()->refreshNow(client()).isOK()); -} +// +TEST_F(LogicalSessionCacheTest, RefreshMatrixSessionState) { + const std::vector<std::vector<std::string>> stateNames = { + {"active", "inactive"}, + {"running", "not running"}, + {"expired", "unexpired"}, + {"ended", "not ended"}, + {"cursor", "no cursor"}, + }; + struct { + // results that we test for after the _refresh + bool inCollection; + bool killed; + } testCases[] = { + // 0, active, running, expired, ended, cursor + {false, true}, + // 1, inactive, running, expired, ended, cursor + {false, true}, + // 2, active, not running, expired, ended, cursor + {false, true}, + // 3, inactive, not running, expired, ended, cursor + {false, true}, + // 4, active, running, unexpired, ended, cursor + {false, true}, + // 5, inactive, running, unexpired, ended, cursor + {false, true}, + // 6, active, not running, unexpired, ended, cursor + {false, true}, + // 7, inactive, not running, unexpired, ended, cursor + {false, true}, + // 8, active, running, expired, not ended, cursor + {true, false}, + // 9, inactive, running, expired, not ended, cursor + {false, true}, + // 10, active, not running, expired, not ended, cursor + {true, false}, + // 11, inactive, not running, expired, not ended, cursor + {false, true}, + // 12, active, running, unexpired, not ended, cursor + {true, false}, + // 13, inactive, running, unexpired, not ended, cursor + {true, false}, + // 14, active, not running, unexpired, not ended, cursor + {true, false}, + // 15, inactive, not running, unexpired, not ended, cursor + {true, false}, + // 16, active, running, expired, ended, no cursor + {false, true}, + // 17, inactive, running, expired, ended, no cursor + {false, true}, + // 18, active, not running, expired, ended, no cursor + {false, true}, + // 19, inactive, not running, expired, ended, no cursor + {false, true}, + // 20, active, running, unexpired, ended, no cursor + {false, true}, + // 21, inactive, running, unexpired, ended, no cursor + {false, true}, + // 22, active, not running, unexpired, ended, no cursor + {false, true}, + // 23, inactive, not running, unexpired, ended, no cursor + {false, true}, + // 24, active, running, expired, not ended, no cursor + {true, false}, + // 25, inactive, running, expired, not ended, no cursor + {false, true}, + // 26, active, not running, expired, not ended, no cursor + {true, false}, + // 27, inactive, not running, expired, not ended, no cursor + {false, false}, + // 28, active, running, unexpired, not ended, no cursor + {true, false}, + // 29, inactive, running, unexpired, not ended, no cursor + {true, false}, + // 30, active, not running, unexpired, not ended, no cursor + {true, false}, + // 31, inactive, not running, unexpired, not ended, no cursor + {true, false}, + }; + + std::vector<LogicalSessionId> ids; + for (int i = 0; i < 32; i++) { + + bool active = !(i & 1); + bool running = !(i & 2); + bool expired = !(i & 4); + bool ended = !(i & 8); + bool cursor = !(i & 16); -// Test larger mixed sets of cache/service active sessions -TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) { - int count = LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity; - for (int i = 0; i < count; i++) { auto lsid = makeLogicalSessionIdForTest(); - service()->add(lsid); + ids.push_back(lsid); + auto lsRecord = makeLogicalSessionRecord(lsid, service()->now() + Milliseconds(500)); - auto record2 = makeLogicalSessionRecordForTest(); - cache()->startSession(opCtx(), record2).transitional_ignore(); + if (running) { + service()->add(lsid); + } + if (active) { + cache()->startSession(opCtx(), lsRecord); + } + if (!expired) { + sessions()->add(lsRecord); + } + if (ended) { + LogicalSessionIdSet lsidSet; + lsidSet.emplace(lsid); + cache()->endSessions(lsidSet); + } + if (cursor) { + service()->addCursorSession(lsid); + } } - int nRefreshed = 0; - - // Check that all lsids refresh successfully - sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) { - nRefreshed = sessions.size(); - return Status::OK(); - }); - // Force a refresh clearOpCtx(); service()->fastForward(kForceRefresh); ASSERT(cache()->refreshNow(client()).isOK()); - ASSERT_EQ(nRefreshed, count * 2); - - // Remove all of the service sessions, should just refresh the cache entries - service()->clear(); - sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) { - nRefreshed = sessions.size(); - return Status::OK(); - }); - - // Force another refresh - service()->fastForward(kForceRefresh); - ASSERT(cache()->refreshNow(client()).isOK()); - - // We should not have refreshed any sessions from the service, only the cache - ASSERT_EQ(nRefreshed, count); - // Force a third refresh - service()->fastForward(kForceRefresh); - ASSERT(cache()->refreshNow(client()).isOK()); - - // Again, we should have only refreshed sessions from the cache - ASSERT_EQ(nRefreshed, count); + for (int i = 0; i < 32; i++) { + std::stringstream failText; + failText << "case " << i << " : "; + for (int j = 0; j < 4; j++) { + failText << stateNames[j][i >> j & 1] << " "; + } + failText << " session case failed: "; + + ASSERT(sessions()->has(ids[i]) == testCases[i].inCollection) + << failText.str() << (testCases[i].inCollection ? "session wasn't in collection" + : "session was in collection"); + ASSERT((service()->matchKilled(ids[i]) != nullptr) == testCases[i].killed) + << failText.str() + << (testCases[i].killed ? "session wasn't killed" : "session was killed"); + } } + } // namespace } // namespace mongo diff --git a/src/mongo/db/service_liason.h b/src/mongo/db/service_liason.h index ed7bcf0df86..b51076fefe9 100644 --- a/src/mongo/db/service_liason.h +++ b/src/mongo/db/service_liason.h @@ -29,6 +29,7 @@ #pragma once #include "mongo/db/logical_session_id.h" +#include "mongo/db/session_killer.h" #include "mongo/stdx/functional.h" #include "mongo/util/periodic_runner.h" #include "mongo/util/time_support.h" @@ -52,7 +53,12 @@ public: * Return a list of sessions that are currently being used to run operations * on this service. */ - virtual LogicalSessionIdSet getActiveSessions() const = 0; + virtual LogicalSessionIdSet getActiveOpSessions() const = 0; + + /** + * Return a list of sessions that are currently attached to open cursors + */ + virtual LogicalSessionIdSet getOpenCursorSessions() const = 0; /** * Schedule a job to be run at regular intervals until the server shuts down. @@ -76,6 +82,12 @@ public: */ virtual Date_t now() const = 0; + /** + * deligaes to a similarly named function on a cursormanager + */ + virtual Status killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher) = 0; + protected: /** * Returns the service context. diff --git a/src/mongo/db/service_liason_mock.cpp b/src/mongo/db/service_liason_mock.cpp index 821de302e02..df0d8ab5276 100644 --- a/src/mongo/db/service_liason_mock.cpp +++ b/src/mongo/db/service_liason_mock.cpp @@ -42,11 +42,16 @@ MockServiceLiasonImpl::MockServiceLiasonImpl() { _runner->startup().transitional_ignore(); } -LogicalSessionIdSet MockServiceLiasonImpl::getActiveSessions() const { +LogicalSessionIdSet MockServiceLiasonImpl::getActiveOpSessions() const { stdx::unique_lock<stdx::mutex> lk(_mutex); return _activeSessions; } +LogicalSessionIdSet MockServiceLiasonImpl::getOpenCursorSessions() const { + stdx::unique_lock<stdx::mutex> lk(_mutex); + return _cursorSessions; +} + void MockServiceLiasonImpl::join() { _runner->shutdown(); } @@ -60,9 +65,25 @@ void MockServiceLiasonImpl::scheduleJob(PeriodicRunner::PeriodicJob job) { return; } + +void MockServiceLiasonImpl::addCursorSession(LogicalSessionId lsid) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _cursorSessions.insert(std::move(lsid)); +} + +void MockServiceLiasonImpl::removeCursorSession(LogicalSessionId lsid) { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _cursorSessions.erase(lsid); +} + +void MockServiceLiasonImpl::clearCursorSession() { + stdx::unique_lock<stdx::mutex> lk(_mutex); + _cursorSessions.clear(); +} + void MockServiceLiasonImpl::add(LogicalSessionId lsid) { stdx::unique_lock<stdx::mutex> lk(_mutex); - _activeSessions.insert(std::move(lsid)); + _cursorSessions.insert(std::move(lsid)); } void MockServiceLiasonImpl::remove(LogicalSessionId lsid) { @@ -83,4 +104,15 @@ int MockServiceLiasonImpl::jobs() { return _timerFactory->jobs(); } +const KillAllSessionsByPattern* MockServiceLiasonImpl::matchKilled(const LogicalSessionId& lsid) { + return _matcher->match(lsid); +} + +Status MockServiceLiasonImpl::killCursorsWithMatchingSessions( + OperationContext* opCtx, const SessionKiller::Matcher& matcher) { + + _matcher = matcher; + return Status::OK(); +} + } // namespace mongo diff --git a/src/mongo/db/service_liason_mock.h b/src/mongo/db/service_liason_mock.h index 39323057e59..f7b6f0be535 100644 --- a/src/mongo/db/service_liason_mock.h +++ b/src/mongo/db/service_liason_mock.h @@ -59,7 +59,8 @@ public: MockServiceLiasonImpl(); // Forwarding methods from the MockServiceLiason - LogicalSessionIdSet getActiveSessions() const; + LogicalSessionIdSet getActiveOpSessions() const; + LogicalSessionIdSet getOpenCursorSessions() const; Date_t now() const; void scheduleJob(PeriodicRunner::PeriodicJob job); void join(); @@ -68,15 +69,27 @@ public: void add(LogicalSessionId lsid); void remove(LogicalSessionId lsid); void clear(); + + void addCursorSession(LogicalSessionId lsid); + void removeCursorSession(LogicalSessionId lsid); + void clearCursorSession(); + void fastForward(Milliseconds time); int jobs(); + const KillAllSessionsByPattern* matchKilled(const LogicalSessionId& lsid); + Status killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher); + private: executor::AsyncTimerFactoryMock* _timerFactory; std::unique_ptr<PeriodicRunnerASIO> _runner; + boost::optional<SessionKiller::Matcher> _matcher; + mutable stdx::mutex _mutex; LogicalSessionIdSet _activeSessions; + LogicalSessionIdSet _cursorSessions; }; /** @@ -87,8 +100,12 @@ public: explicit MockServiceLiason(std::shared_ptr<MockServiceLiasonImpl> impl) : _impl(std::move(impl)) {} - LogicalSessionIdSet getActiveSessions() const override { - return _impl->getActiveSessions(); + LogicalSessionIdSet getActiveOpSessions() const override { + return _impl->getActiveOpSessions(); + } + + LogicalSessionIdSet getOpenCursorSessions() const override { + return _impl->getOpenCursorSessions(); } Date_t now() const override { @@ -103,6 +120,11 @@ public: return _impl->join(); } + Status killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher) { + return _impl->killCursorsWithMatchingSessions(opCtx, matcher); + } + protected: ServiceContext* _context() override { return _serviceContext.get(); diff --git a/src/mongo/db/service_liason_mongod.cpp b/src/mongo/db/service_liason_mongod.cpp index 7fc58d53d24..8c4ad33a987 100644 --- a/src/mongo/db/service_liason_mongod.cpp +++ b/src/mongo/db/service_liason_mongod.cpp @@ -40,7 +40,7 @@ namespace mongo { -LogicalSessionIdSet ServiceLiasonMongod::getActiveSessions() const { +LogicalSessionIdSet ServiceLiasonMongod::getActiveOpSessions() const { LogicalSessionIdSet activeSessions; invariant(hasGlobalServiceContext()); @@ -62,25 +62,26 @@ LogicalSessionIdSet ServiceLiasonMongod::getActiveSessions() const { activeSessions.insert(*lsid); } } + return activeSessions; +} +LogicalSessionIdSet ServiceLiasonMongod::getOpenCursorSessions() const { + LogicalSessionIdSet cursorSessions; // Append any in-use session ids from the global and collection-level cursor managers - { - boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; - auto client = Client::getCurrent(); + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto client = Client::getCurrent(); - auto* const opCtx = [&client, &uniqueCtx] { - if (client->getOperationContext()) { - return client->getOperationContext(); - } - - uniqueCtx.emplace(client->makeOperationContext()); - return uniqueCtx->get(); - }(); + auto* const opCtx = [&client, &uniqueCtx] { + if (client->getOperationContext()) { + return client->getOperationContext(); + } - CursorManager::appendAllActiveSessions(opCtx, &activeSessions); - } + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); - return activeSessions; + CursorManager::appendAllActiveSessions(opCtx, &cursorSessions); + return cursorSessions; } void ServiceLiasonMongod::scheduleJob(PeriodicRunner::PeriodicJob job) { @@ -102,4 +103,9 @@ ServiceContext* ServiceLiasonMongod::_context() { return getGlobalServiceContext(); } +Status ServiceLiasonMongod::killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher) { + return CursorManager::getGlobalCursorManager()->killCursorsWithMatchingSessions(opCtx, matcher); +} + } // namespace mongo diff --git a/src/mongo/db/service_liason_mongod.h b/src/mongo/db/service_liason_mongod.h index 31304b94573..3feb502c437 100644 --- a/src/mongo/db/service_liason_mongod.h +++ b/src/mongo/db/service_liason_mongod.h @@ -50,7 +50,8 @@ namespace mongo { */ class ServiceLiasonMongod : public ServiceLiason { public: - LogicalSessionIdSet getActiveSessions() const override; + LogicalSessionIdSet getActiveOpSessions() const override; + LogicalSessionIdSet getOpenCursorSessions() const override; void scheduleJob(PeriodicRunner::PeriodicJob job) override; @@ -58,6 +59,9 @@ public: Date_t now() const override; + Status killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher) override; + protected: /** * Returns the service context. diff --git a/src/mongo/db/service_liason_mongos.cpp b/src/mongo/db/service_liason_mongos.cpp index bf0bb81b70a..37c5420d28d 100644 --- a/src/mongo/db/service_liason_mongos.cpp +++ b/src/mongo/db/service_liason_mongos.cpp @@ -39,7 +39,7 @@ namespace mongo { -LogicalSessionIdSet ServiceLiasonMongos::getActiveSessions() const { +LogicalSessionIdSet ServiceLiasonMongos::getActiveOpSessions() const { LogicalSessionIdSet activeSessions; invariant(hasGlobalServiceContext()); @@ -51,6 +51,12 @@ LogicalSessionIdSet ServiceLiasonMongos::getActiveSessions() const { return activeSessions; } +LogicalSessionIdSet ServiceLiasonMongos::getOpenCursorSessions() const { + LogicalSessionIdSet openCursorSessions; + + return openCursorSessions; +} + void ServiceLiasonMongos::scheduleJob(PeriodicRunner::PeriodicJob job) { invariant(hasGlobalServiceContext()); getGlobalServiceContext()->getPeriodicRunner()->scheduleJob(std::move(job)); @@ -70,4 +76,10 @@ ServiceContext* ServiceLiasonMongos::_context() { return getGlobalServiceContext(); } +Status ServiceLiasonMongos::killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher) { + auto cursorManager = Grid::get(getGlobalServiceContext())->getCursorManager(); + return cursorManager->killCursorsWithMatchingSessions(opCtx, matcher); +} + } // namespace mongo diff --git a/src/mongo/db/service_liason_mongos.h b/src/mongo/db/service_liason_mongos.h index 79bc5df3fa7..26780f7b702 100644 --- a/src/mongo/db/service_liason_mongos.h +++ b/src/mongo/db/service_liason_mongos.h @@ -50,7 +50,8 @@ namespace mongo { */ class ServiceLiasonMongos : public ServiceLiason { public: - LogicalSessionIdSet getActiveSessions() const override; + LogicalSessionIdSet getActiveOpSessions() const override; + LogicalSessionIdSet getOpenCursorSessions() const override; void scheduleJob(PeriodicRunner::PeriodicJob job) override; @@ -58,6 +59,9 @@ public: Date_t now() const override; + Status killCursorsWithMatchingSessions(OperationContext* opCtx, + const SessionKiller::Matcher& matcher) override; + protected: /** * Returns the service context. diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index b6341eb0423..69e420f40b8 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -55,15 +55,15 @@ BSONObj lsidQuery(const LogicalSessionRecord& record) { return lsidQuery(record.getId()); } -BSONObj updateQuery(const LogicalSessionRecord& record, Date_t refreshTime) { +BSONObj updateQuery(const LogicalSessionRecord& record) { // { $max : { lastUse : <time> }, $setOnInsert : { user : <user> } } // Build our update doc. BSONObjBuilder updateBuilder; { - BSONObjBuilder maxBuilder(updateBuilder.subobjStart("$max")); - maxBuilder.append(LogicalSessionRecord::kLastUseFieldName, refreshTime); + BSONObjBuilder maxBuilder(updateBuilder.subobjStart("$currentDate")); + maxBuilder.append(LogicalSessionRecord::kLastUseFieldName, true); } if (record.getUser()) { @@ -200,17 +200,15 @@ SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const N Status SessionsCollection::doRefresh(const NamespaceString& ns, const LogicalSessionRecordSet& sessions, - Date_t refreshTime, SendBatchFn send) { auto init = [ns](BSONObjBuilder* batch) { batch->append("update", ns.coll()); batch->append("ordered", false); }; - auto add = [&refreshTime](BSONArrayBuilder* entries, const LogicalSessionRecord& record) { - entries->append(BSON("q" << lsidQuery(record) << "u" << updateQuery(record, refreshTime) - << "upsert" - << true)); + auto add = [](BSONArrayBuilder* entries, const LogicalSessionRecord& record) { + entries->append( + BSON("q" << lsidQuery(record) << "u" << updateQuery(record) << "upsert" << true)); }; return runBulkCmd("updates", init, add, send, sessions); @@ -218,12 +216,12 @@ Status SessionsCollection::doRefresh(const NamespaceString& ns, Status SessionsCollection::doRefreshExternal(const NamespaceString& ns, const LogicalSessionRecordSet& sessions, - Date_t refreshTime, SendBatchFn send) { auto makeT = [] { return std::vector<LogicalSessionRecord>{}; }; - auto add = [&refreshTime](std::vector<LogicalSessionRecord>& batch, - const LogicalSessionRecord& record) { batch.push_back(record); }; + auto add = [](std::vector<LogicalSessionRecord>& batch, const LogicalSessionRecord& record) { + batch.push_back(record); + }; auto sendLocal = [&](std::vector<LogicalSessionRecord>& batch) { RefreshSessionsCmdFromClusterMember idl; diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h index e86e29cd67b..9d29924069c 100644 --- a/src/mongo/db/sessions_collection.h +++ b/src/mongo/db/sessions_collection.h @@ -60,8 +60,7 @@ public: * or equal to the given time. Returns an error if a networking issue occurred. */ virtual Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions, - Date_t refreshTime) = 0; + const LogicalSessionRecordSet& sessions) = 0; /** * Removes the authoritative records for the specified sessions. @@ -99,11 +98,9 @@ protected: */ Status doRefresh(const NamespaceString& ns, const LogicalSessionRecordSet& sessions, - Date_t refreshTime, SendBatchFn send); Status doRefreshExternal(const NamespaceString& ns, const LogicalSessionRecordSet& sessions, - Date_t refreshTime, SendBatchFn send); /** diff --git a/src/mongo/db/sessions_collection_mock.cpp b/src/mongo/db/sessions_collection_mock.cpp index b70f8e015bf..7f2cff63e70 100644 --- a/src/mongo/db/sessions_collection_mock.cpp +++ b/src/mongo/db/sessions_collection_mock.cpp @@ -103,4 +103,16 @@ Status MockSessionsCollectionImpl::_removeRecords(const LogicalSessionIdSet& ses return Status::OK(); } +StatusWith<LogicalSessionIdSet> MockSessionsCollectionImpl::findRemovedSessions( + OperationContext* opCtx, const LogicalSessionIdSet& sessions) { + LogicalSessionIdSet lsids; + stdx::unique_lock<stdx::mutex> lk(_mutex); + for (auto& lsid : sessions) { + if (_sessions.find(lsid) == _sessions.end()) { + lsids.emplace(lsid); + } + } + return lsids; +} + } // namespace mongo diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h index 6a88a3af816..fb20533ed60 100644 --- a/src/mongo/db/sessions_collection_mock.h +++ b/src/mongo/db/sessions_collection_mock.h @@ -79,6 +79,9 @@ public: void clearSessions(); const SessionMap& sessions() const; + StatusWith<LogicalSessionIdSet> findRemovedSessions(OperationContext* opCtx, + const LogicalSessionIdSet& sessions); + private: // Default implementations, may be overridden with custom hooks. Status _refreshSessions(const LogicalSessionRecordSet& sessions); @@ -102,8 +105,7 @@ public: : _impl(std::move(impl)) {} Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions, - Date_t refreshTime) override { + const LogicalSessionRecordSet& sessions) override { return _impl->refreshSessions(sessions); } @@ -113,7 +115,7 @@ public: StatusWith<LogicalSessionIdSet> findRemovedSessions( OperationContext* opCtx, const LogicalSessionIdSet& sessions) override { - return LogicalSessionIdSet{}; + return _impl->findRemovedSessions(opCtx, sessions); } Status removeTransactionRecords(OperationContext* opCtx, diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp index f4d5f44b453..4cfbffff3c3 100644 --- a/src/mongo/db/sessions_collection_rs.cpp +++ b/src/mongo/db/sessions_collection_rs.cpp @@ -132,8 +132,7 @@ auto dispatch(const NamespaceString& ns, } // namespace Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions, - Date_t refreshTime) { + const LogicalSessionRecordSet& sessions) { return dispatch( kSessionsNamespaceString, MODE_IX, @@ -142,13 +141,11 @@ Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx, DBDirectClient client(opCtx); return doRefresh(kSessionsNamespaceString, sessions, - refreshTime, makeSendFnForBatchWrite(kSessionsNamespaceString, &client)); }, [&](DBClientBase* client) { return doRefreshExternal(kSessionsNamespaceString, sessions, - refreshTime, makeSendFnForCommand(kSessionsNamespaceString, client)); }); } diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h index cc49ecf511b..08459d6ff4f 100644 --- a/src/mongo/db/sessions_collection_rs.h +++ b/src/mongo/db/sessions_collection_rs.h @@ -58,8 +58,7 @@ public: * or equal to the current time. */ Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions, - Date_t refreshTime) override; + const LogicalSessionRecordSet& sessions) override; /** * Removes the authoritative records for the specified sessions. diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp index 544915c8494..38738207196 100644 --- a/src/mongo/db/sessions_collection_sharded.cpp +++ b/src/mongo/db/sessions_collection_sharded.cpp @@ -53,8 +53,7 @@ BSONObj lsidQuery(const LogicalSessionId& lsid) { } // namespace Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions, - Date_t refreshTime) { + const LogicalSessionRecordSet& sessions) { auto send = [&](BSONObj toSend) { auto opMsg = OpMsgRequest::fromDBAndBody(SessionsCollection::kSessionsDb, toSend); auto request = BatchedCommandRequest::parseUpdate(opMsg); @@ -72,7 +71,7 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx, return Status(error, response.getErrMessage()); }; - return doRefresh(kSessionsNamespaceString, sessions, refreshTime, send); + return doRefresh(kSessionsNamespaceString, sessions, send); } Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx, diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h index f9a8165a1c7..01ac730e09a 100644 --- a/src/mongo/db/sessions_collection_sharded.h +++ b/src/mongo/db/sessions_collection_sharded.h @@ -48,8 +48,7 @@ public: * or equal to the current time. */ Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions, - Date_t refreshTime) override; + const LogicalSessionRecordSet& sessions) override; /** * Removes the authoritative records for the specified sessions. diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp index ada3583bb26..84c2b4daae8 100644 --- a/src/mongo/db/sessions_collection_standalone.cpp +++ b/src/mongo/db/sessions_collection_standalone.cpp @@ -45,12 +45,10 @@ BSONObj lsidQuery(const LogicalSessionId& lsid) { } // namespace Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions, - Date_t refreshTime) { + const LogicalSessionRecordSet& sessions) { DBDirectClient client(opCtx); return doRefresh(kSessionsNamespaceString, sessions, - refreshTime, makeSendFnForBatchWrite(kSessionsNamespaceString, &client)); } diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h index 2ecd49afa79..ad5f155dd64 100644 --- a/src/mongo/db/sessions_collection_standalone.h +++ b/src/mongo/db/sessions_collection_standalone.h @@ -47,8 +47,7 @@ public: * or equal to the current time. */ Status refreshSessions(OperationContext* opCtx, - const LogicalSessionRecordSet& sessions, - Date_t refreshTime) override; + const LogicalSessionRecordSet& sessions) override; /** * Removes the authoritative records for the specified sessions. diff --git a/src/mongo/dbtests/logical_sessions_tests.cpp b/src/mongo/dbtests/logical_sessions_tests.cpp index 51614181623..13ca0c434c8 100644 --- a/src/mongo/dbtests/logical_sessions_tests.cpp +++ b/src/mongo/dbtests/logical_sessions_tests.cpp @@ -153,27 +153,27 @@ public: auto thePast = now - Minutes(5); // Attempt to refresh with no active records, should succeed (and do nothing). - auto resRefresh = collection()->refreshSessions(opCtx(), LogicalSessionRecordSet{}, now); + auto resRefresh = collection()->refreshSessions(opCtx(), LogicalSessionRecordSet{}); ASSERT(resRefresh.isOK()); // Attempt to refresh one active record, should succeed. auto record1 = makeRecord(thePast); auto res = insertRecord(opCtx(), record1); ASSERT_OK(res); - resRefresh = collection()->refreshSessions(opCtx(), {record1}, now); + resRefresh = collection()->refreshSessions(opCtx(), {record1}); ASSERT(resRefresh.isOK()); // The timestamp on the refreshed record should be updated. auto swRecord = fetchRecord(opCtx(), record1.getId()); ASSERT(swRecord.isOK()); - ASSERT_EQ(swRecord.getValue().getLastUse(), now); + ASSERT_GTE(swRecord.getValue().getLastUse(), now); // Clear the collection. db.remove(ns(), BSONObj()); // Attempt to refresh a record that is not present, should upsert it. auto record2 = makeRecord(thePast); - resRefresh = collection()->refreshSessions(opCtx(), {record2}, now); + resRefresh = collection()->refreshSessions(opCtx(), {record2}); ASSERT(resRefresh.isOK()); swRecord = fetchRecord(opCtx(), record2.getId()); @@ -185,23 +185,26 @@ public: // Attempt a refresh of many records, split into batches. LogicalSessionRecordSet toRefresh; int recordCount = 5000; + unsigned int notRefreshed = 0; for (int i = 0; i < recordCount; i++) { - auto record = makeRecord(thePast); + auto record = makeRecord(now); res = insertRecord(opCtx(), record); // Refresh some of these records. if (i % 4 == 0) { toRefresh.insert(record); + } else { + notRefreshed++; } } // Run the refresh, should succeed. - resRefresh = collection()->refreshSessions(opCtx(), toRefresh, now); + resRefresh = collection()->refreshSessions(opCtx(), toRefresh); ASSERT(resRefresh.isOK()); // Ensure that the right number of timestamps were updated. auto n = db.count(ns(), BSON("lastUse" << now)); - ASSERT_EQ(n, toRefresh.size()); + ASSERT_EQ(n, notRefreshed); } }; |