diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-05-16 17:42:25 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2019-05-21 17:42:12 -0400 |
commit | 65656c4bc9390c52205b73451abff0b7b6b74396 (patch) | |
tree | 6892a9719c44c38af42a6ecabc0984a78fe753cb /src/mongo | |
parent | 107dd43e99f191c541c0c0faab40ff0d9bc47550 (diff) | |
download | mongo-65656c4bc9390c52205b73451abff0b7b6b74396.tar.gz |
SERVER-41193 Get rid of the `refreshSessionsInternal` command
Diffstat (limited to 'src/mongo')
-rw-r--r-- | src/mongo/db/SConscript | 1 | ||||
-rw-r--r-- | src/mongo/db/commands/SConscript | 3 | ||||
-rw-r--r-- | src/mongo/db/commands/end_sessions_command.cpp | 13 | ||||
-rw-r--r-- | src/mongo/db/commands/refresh_sessions_command.cpp | 11 | ||||
-rw-r--r-- | src/mongo/db/commands/refresh_sessions_command_internal.cpp | 89 | ||||
-rw-r--r-- | src/mongo/db/commands/sessions_commands.idl (renamed from src/mongo/db/commands/end_sessions.idl) | 12 | ||||
-rw-r--r-- | src/mongo/db/commands/start_session_command.cpp | 29 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache.h | 30 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp | 68 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.cpp-c26ee759 | 435 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_impl.h | 10 | ||||
-rw-r--r-- | src/mongo/db/logical_session_cache_noop.h | 14 | ||||
-rw-r--r-- | src/mongo/db/refresh_sessions.idl | 52 | ||||
-rw-r--r-- | src/mongo/db/sessions_collection.cpp | 1 | ||||
-rw-r--r-- | src/mongo/s/commands/cluster_create_cmd.cpp | 16 |
15 files changed, 527 insertions, 257 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index de3ecb0da52..8d789440a4f 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -1319,7 +1319,6 @@ env.Library( source=[ 'logical_session_id.cpp', env.Idlc('logical_session_id.idl')[0], - env.Idlc('refresh_sessions.idl')[0], ], LIBDEPS=[ '$BUILD_DIR/mongo/base', diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript index 2d1be53c64f..ef2e9fa712c 100644 --- a/src/mongo/db/commands/SConscript +++ b/src/mongo/db/commands/SConscript @@ -106,8 +106,8 @@ env.Library( 'refresh_sessions_command.cpp', 'rename_collection_common.cpp', 'start_session_command.cpp', - env.Idlc('end_sessions.idl')[0], env.Idlc('parameters.idl')[0], + env.Idlc('sessions_commands.idl')[0], ], LIBDEPS_PRIVATE=[ '$BUILD_DIR/mongo/bson/mutable/mutable_bson', @@ -143,7 +143,6 @@ env.Library( 'logical_session_server_status_section.cpp', 'mr_common.cpp', 'reap_logical_session_cache_now.cpp', - 'refresh_sessions_command_internal.cpp', 'traffic_recording_cmds.cpp', 'user_management_commands_common.cpp', env.Idlc('drop_connections.idl')[0], diff --git a/src/mongo/db/commands/end_sessions_command.cpp b/src/mongo/db/commands/end_sessions_command.cpp index b8325015cf1..15c19f907d8 100644 --- a/src/mongo/db/commands/end_sessions_command.cpp +++ b/src/mongo/db/commands/end_sessions_command.cpp @@ -33,7 +33,7 @@ #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" -#include "mongo/db/commands/end_sessions_gen.h" +#include "mongo/db/commands/sessions_commands_gen.h" #include "mongo/db/logical_session_cache.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" @@ -51,15 +51,19 @@ public: AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kAlways; } + bool adminOnly() const override { return false; } + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } + std::string help() const override { return "end a set of logical sessions"; } + Status checkAuthForOperation(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj) const override { @@ -79,11 +83,12 @@ public: const std::string& db, const BSONObj& cmdObj, BSONObjBuilder& result) override { - auto lsCache = LogicalSessionCache::get(opCtx); + auto endSessionsRequest = EndSessionsCmdFromClient::parse( + IDLParserErrorContext("EndSessionsCmdFromClient"), cmdObj); - auto cmd = EndSessionsCmdFromClient::parse("EndSessionsCmdFromClient"_sd, cmdObj); + LogicalSessionCache::get(opCtx)->endSessions( + makeLogicalSessionIds(endSessionsRequest.getSessions(), opCtx)); - lsCache->endSessions(makeLogicalSessionIds(cmd.getEndSessions(), opCtx)); return true; } diff --git a/src/mongo/db/commands/refresh_sessions_command.cpp b/src/mongo/db/commands/refresh_sessions_command.cpp index 8e2c663e6cc..a74ddc04589 100644 --- a/src/mongo/db/commands/refresh_sessions_command.cpp +++ b/src/mongo/db/commands/refresh_sessions_command.cpp @@ -29,15 +29,14 @@ #include "mongo/platform/basic.h" -#include "mongo/base/init.h" #include "mongo/db/auth/authorization_session.h" #include "mongo/db/client.h" #include "mongo/db/commands.h" +#include "mongo/db/commands/sessions_commands_gen.h" #include "mongo/db/jsobj.h" #include "mongo/db/logical_session_cache.h" #include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" -#include "mongo/db/refresh_sessions_gen.h" namespace mongo { namespace { @@ -81,9 +80,11 @@ public: const std::string& db, const BSONObj& cmdObj, BSONObjBuilder& result) override { - IDLParserErrorContext ctx("RefreshSessionsCmdFromClient"); - auto cmd = RefreshSessionsCmdFromClient::parse(ctx, cmdObj); - uassertStatusOK(LogicalSessionCache::get(opCtx)->refreshSessions(opCtx, cmd)); + auto refreshSessionsRequest = RefreshSessionsCmdFromClient::parse( + IDLParserErrorContext("RefreshSessionsCmdFromClient"), cmdObj); + + uassertStatusOK(LogicalSessionCache::get(opCtx)->refreshSessions( + opCtx, refreshSessionsRequest.getSessions())); return true; } diff --git a/src/mongo/db/commands/refresh_sessions_command_internal.cpp b/src/mongo/db/commands/refresh_sessions_command_internal.cpp deleted file mode 100644 index aa3ebb4b194..00000000000 --- a/src/mongo/db/commands/refresh_sessions_command_internal.cpp +++ /dev/null @@ -1,89 +0,0 @@ -/** - * Copyright (C) 2018-present MongoDB, Inc. - * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the Server Side Public License, version 1, - * as published by MongoDB, Inc. - * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * Server Side Public License for more details. - * - * You should have received a copy of the Server Side Public License - * along with this program. If not, see - * <http://www.mongodb.com/licensing/server-side-public-license>. - * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the Server Side Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. - */ - -#include "mongo/platform/basic.h" - -#include "mongo/base/init.h" -#include "mongo/db/auth/authorization_session.h" -#include "mongo/db/client.h" -#include "mongo/db/commands.h" -#include "mongo/db/jsobj.h" -#include "mongo/db/logical_session_cache.h" -#include "mongo/db/logical_session_id_helpers.h" -#include "mongo/db/operation_context.h" -#include "mongo/db/refresh_sessions_gen.h" - -namespace mongo { - -class RefreshSessionsCommandInternal final : public BasicCommand { - RefreshSessionsCommandInternal(const RefreshSessionsCommandInternal&) = delete; - RefreshSessionsCommandInternal& operator=(const RefreshSessionsCommandInternal&) = delete; - -public: - RefreshSessionsCommandInternal() : BasicCommand("refreshSessionsInternal") {} - - AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { - return AllowedOnSecondary::kAlways; - } - bool adminOnly() const override { - return false; - } - bool supportsWriteConcern(const BSONObj& cmd) const override { - return false; - } - std::string help() const override { - return "renew a set of logical sessions"; - } - Status checkAuthForOperation(OperationContext* opCtx, - const std::string& dbname, - const BSONObj& cmdObj) const override { - // Must be authenticated as an internal cluster member. - auto authSession = AuthorizationSession::get(opCtx->getClient()); - if (!authSession->isAuthorizedForPrivilege( - Privilege(ResourcePattern::forClusterResource(), ActionType::impersonate))) { - return {ErrorCodes::Unauthorized, "unauthorized"}; - } - return Status::OK(); - } - - virtual bool run(OperationContext* opCtx, - const std::string& db, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { - IDLParserErrorContext ctx("RefreshSessionsCmdFromClusterMember"); - auto cmd = RefreshSessionsCmdFromClusterMember::parse(ctx, cmdObj); - auto res = - LogicalSessionCache::get(opCtx->getServiceContext())->refreshSessions(opCtx, cmd); - uassertStatusOK(res); - - return true; - } -} refreshSessionsCommandInternal; - -} // namespace mongo diff --git a/src/mongo/db/commands/end_sessions.idl b/src/mongo/db/commands/sessions_commands.idl index 2dd7cfceffb..706537fff79 100644 --- a/src/mongo/db/commands/end_sessions.idl +++ b/src/mongo/db/commands/sessions_commands.idl @@ -38,4 +38,14 @@ structs: description: "A struct representing an endSessions command from a client" strict: false fields: - endSessions: array<LogicalSessionFromClient> + endSessions: + type: array<LogicalSessionFromClient> + cpp_name: sessions + + RefreshSessionsCmdFromClient: + description: "A struct representing a refreshSessions command from a client" + strict: false + fields: + refreshSessions: + type: array<LogicalSessionFromClient> + cpp_name: sessions diff --git a/src/mongo/db/commands/start_session_command.cpp b/src/mongo/db/commands/start_session_command.cpp index a9e64c9d5c4..02293532a34 100644 --- a/src/mongo/db/commands/start_session_command.cpp +++ b/src/mongo/db/commands/start_session_command.cpp @@ -45,6 +45,7 @@ #include "mongo/db/stats/top.h" namespace mongo { +namespace { class StartSessionCommand final : public BasicCommand { StartSessionCommand(const StartSessionCommand&) = delete; @@ -56,37 +57,43 @@ public: AllowedOnSecondary secondaryAllowed(ServiceContext*) const override { return AllowedOnSecondary::kAlways; } + bool adminOnly() const override { return false; } + bool supportsWriteConcern(const BSONObj& cmd) const override { return false; } + std::string help() const override { return "start a logical session"; } + Status checkAuthForOperation(OperationContext* opCtx, const std::string& dbname, const BSONObj& cmdObj) const override { return Status::OK(); } - virtual bool run(OperationContext* opCtx, - const std::string& db, - const BSONObj& cmdObj, - BSONObjBuilder& result) override { - auto client = opCtx->getClient(); - ServiceContext* serviceContext = client->getServiceContext(); + bool run(OperationContext* opCtx, + const std::string& db, + const BSONObj& cmdObj, + BSONObjBuilder& result) override { + const auto service = opCtx->getServiceContext(); + const auto lsCache = LogicalSessionCache::get(service); + + auto newSessionRecord = + makeLogicalSessionRecord(opCtx, service->getFastClockSource()->now()); - auto lsCache = LogicalSessionCache::get(serviceContext); - boost::optional<LogicalSessionRecord> record = - makeLogicalSessionRecord(opCtx, lsCache->now()); - uassertStatusOK(lsCache->startSession(opCtx, record.get())); + uassertStatusOK(lsCache->startSession(opCtx, newSessionRecord)); - makeLogicalSessionToClient(record->getId()).serialize(&result); + makeLogicalSessionToClient(newSessionRecord.getId()).serialize(&result); return true; } + } startSessionCommand; +} // namespace } // namespace mongo diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h index e035ee13d7f..38f323eeda5 100644 --- a/src/mongo/db/logical_session_cache.h +++ b/src/mongo/db/logical_session_cache.h @@ -35,7 +35,6 @@ #include "mongo/db/logical_session_cache_gen.h" #include "mongo/db/logical_session_cache_stats_gen.h" #include "mongo/db/logical_session_id.h" -#include "mongo/db/refresh_sessions_gen.h" namespace mongo { @@ -67,24 +66,22 @@ public: * to be the most recently used and updates its lastUse date to be the current * time. Returns an error if the session was not found. */ - virtual Status promote(LogicalSessionId lsid) = 0; + virtual Status promote(const LogicalSessionId& lsid) = 0; /** - * Inserts a new authoritative session record into the cache. This method will - * insert the authoritative record into the sessions collection. This method - * should only be used when starting new sessions and should not be used to - * insert records for existing sessions. + * Inserts a new authoritative session record into the cache. + * + * This method will insert the authoritative record into the sessions collection and should only + * be used when starting new sessions. It should not be used to insert records for existing + * sessions. */ - virtual Status startSession(OperationContext* opCtx, LogicalSessionRecord record) = 0; + virtual Status startSession(OperationContext* opCtx, const LogicalSessionRecord& record) = 0; /** - * Refresh the given sessions. Updates the timestamps of these records in - * the local cache. + * Refresh the given sessions. Updates the timestamps of these records in the local cache. */ virtual Status refreshSessions(OperationContext* opCtx, - const RefreshSessionsCmdFromClient& cmd) = 0; - virtual Status refreshSessions(OperationContext* opCtx, - const RefreshSessionsCmdFromClusterMember& cmd) = 0; + const std::vector<LogicalSessionFromClient>& sessions) = 0; /** * Vivifies the session in the cache. I.e. creates it if it isn't there, updates last use if it @@ -98,8 +95,8 @@ public: virtual void endSessions(const LogicalSessionIdSet& lsids) = 0; /** - * Refreshes the cache synchronously. This flushes all pending refreshes and - * inserts to the sessions collection. + * Refreshes the cache synchronously. This flushes all pending refreshes and inserts to the + * sessions collection. */ virtual Status refreshNow(Client* client) = 0; @@ -109,11 +106,6 @@ public: virtual Status reapNow(Client* client) = 0; /** - * Returns the current time. - */ - virtual Date_t now() = 0; - - /** * Returns the number of session records currently in the cache. */ virtual size_t size() = 0; diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp index 2095595eb5f..c9dd14236b1 100644 --- a/src/mongo/db/logical_session_cache_impl.cpp +++ b/src/mongo/db/logical_session_cache_impl.cpp @@ -63,8 +63,8 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> : _service(std::move(service)), _sessionsColl(std::move(collection)), _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) { - _stats.setLastSessionsCollectionJobTimestamp(now()); - _stats.setLastTransactionReaperJobTimestamp(now()); + _stats.setLastSessionsCollectionJobTimestamp(_service->now()); + _stats.setLastTransactionReaperJobTimestamp(_service->now()); if (!disableLogicalSessionCacheRefresh) { _service->scheduleJob({"LogicalSessionCacheRefresh", @@ -85,7 +85,7 @@ void LogicalSessionCacheImpl::joinOnShutDown() { _service->join(); } -Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) { +Status LogicalSessionCacheImpl::promote(const LogicalSessionId& lsid) { stdx::lock_guard<stdx::mutex> lk(_mutex); auto it = _activeSessions.find(lsid); if (it == _activeSessions.end()) { @@ -95,38 +95,19 @@ Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) { return Status::OK(); } -Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) { - // Add the new record to our local cache. We will insert it into the sessions collection - // the next time _refresh is called. If there is already a record in the cache for this - // session, we'll just write over it with our newer, more recent one. +Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx, + const LogicalSessionRecord& record) { return _addToCache(record); } -Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, - const RefreshSessionsCmdFromClient& cmd) { - // Update the timestamps of all these records in our cache. - auto sessions = makeLogicalSessionIds(cmd.getRefreshSessions(), opCtx); - for (const auto& lsid : sessions) { +Status LogicalSessionCacheImpl::refreshSessions( + OperationContext* opCtx, const std::vector<LogicalSessionFromClient>& sessions) { + // Update the timestamps of all these records in our cache + for (const auto& lsid : makeLogicalSessionIds(sessions, opCtx)) { if (!promote(lsid).isOK()) { // This is a new record, insert it. - auto addToCacheStatus = _addToCache(makeLogicalSessionRecord(opCtx, lsid, now())); - if (!addToCacheStatus.isOK()) { - return addToCacheStatus; - } - } - } - - return Status::OK(); -} - -Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, - const RefreshSessionsCmdFromClusterMember& cmd) { - // Update the timestamps of all these records in our cache. - auto records = cmd.getRefreshSessionsInternal(); - for (const auto& record : records) { - if (!promote(record.getId()).isOK()) { - // This is a new record, insert it. - auto addToCacheStatus = _addToCache(record); + auto addToCacheStatus = + _addToCache(makeLogicalSessionRecord(opCtx, lsid, _service->now())); if (!addToCacheStatus.isOK()) { return addToCacheStatus; } @@ -138,7 +119,7 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx, Status LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) { if (!promote(lsid).isOK()) { - return startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now())); + return startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, _service->now())); } return Status::OK(); } @@ -156,10 +137,6 @@ Status LogicalSessionCacheImpl::reapNow(Client* client) { return _reap(client); } -Date_t LogicalSessionCacheImpl::now() { - return _service->now(); -} - size_t LogicalSessionCacheImpl::size() { stdx::lock_guard<stdx::mutex> lock(_mutex); return _activeSessions.size(); @@ -192,7 +169,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { _stats.setLastTransactionReaperJobEntriesCleanedUp(0); // Start the new run. - _stats.setLastTransactionReaperJobTimestamp(now()); + _stats.setLastTransactionReaperJobTimestamp(_service->now()); _stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1); } @@ -226,15 +203,14 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { return Status::OK(); } - numReaped = - _reapSessionsOlderThanFn(opCtx, - *_sessionsColl, - opCtx->getServiceContext()->getFastClockSource()->now() - - Minutes(gTransactionRecordMinimumLifetimeMinutes)); + numReaped = _reapSessionsOlderThanFn(opCtx, + *_sessionsColl, + _service->now() - + Minutes(gTransactionRecordMinimumLifetimeMinutes)); } catch (const DBException& ex) { { stdx::lock_guard<stdx::mutex> lk(_mutex); - auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); + auto millis = _service->now() - _stats.getLastTransactionReaperJobTimestamp(); _stats.setLastTransactionReaperJobDurationMillis(millis.count()); } @@ -243,7 +219,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) { { stdx::lock_guard<stdx::mutex> lk(_mutex); - auto millis = now() - _stats.getLastTransactionReaperJobTimestamp(); + auto millis = _service->now() - _stats.getLastTransactionReaperJobTimestamp(); _stats.setLastTransactionReaperJobDurationMillis(millis.count()); _stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped); } @@ -263,14 +239,14 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { _stats.setLastSessionsCollectionJobCursorsClosed(0); // Start the new run. - _stats.setLastSessionsCollectionJobTimestamp(now()); + _stats.setLastSessionsCollectionJobTimestamp(_service->now()); _stats.setSessionsCollectionJobCount(_stats.getSessionsCollectionJobCount() + 1); } // This will finish timing _refresh for our stats no matter when we return. const auto timeRefreshJob = makeGuard([this] { stdx::lock_guard<stdx::mutex> lk(_mutex); - auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp(); + auto millis = _service->now() - _stats.getLastSessionsCollectionJobTimestamp(); _stats.setLastSessionsCollectionJobDurationMillis(millis.count()); }); @@ -336,7 +312,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) { if (explicitlyEndingSessions.count(it) > 0) { continue; } - activeSessionRecords.insert(makeLogicalSessionRecord(it, now())); + activeSessionRecords.insert(makeLogicalSessionRecord(it, _service->now())); } for (const auto& it : activeSessions) { activeSessionRecords.insert(it.second); diff --git a/src/mongo/db/logical_session_cache_impl.cpp-c26ee759 b/src/mongo/db/logical_session_cache_impl.cpp-c26ee759 new file mode 100644 index 00000000000..c9dd14236b1 --- /dev/null +++ b/src/mongo/db/logical_session_cache_impl.cpp-c26ee759 @@ -0,0 +1,435 @@ +/** + * Copyright (C) 2018-present MongoDB, Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the Server Side Public License, version 1, + * as published by MongoDB, Inc. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * Server Side Public License for more details. + * + * You should have received a copy of the Server Side Public License + * along with this program. If not, see + * <http://www.mongodb.com/licensing/server-side-public-license>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the Server Side Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl + +#include "mongo/platform/basic.h" + +#include "mongo/db/logical_session_cache_impl.h" + +#include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/s/operation_sharding_state.h" +#include "mongo/db/service_context.h" +#include "mongo/platform/atomic_word.h" +#include "mongo/util/duration.h" +#include "mongo/util/log.h" +#include "mongo/util/periodic_runner.h" +#include "mongo/util/scopeguard.h" + +namespace mongo { + +namespace { + +void clearShardingOperationFailedStatus(OperationContext* opCtx) { + // We do not intend to immediately act upon sharding errors if we receive them during sessions + // collection operations. We will instead attempt the same operations during the next refresh + // cycle. + OperationShardingState::get(opCtx).resetShardingOperationFailedStatus(); +} + +} // namespace + +LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service, + std::shared_ptr<SessionsCollection> collection, + ReapSessionsOlderThanFn reapSessionsOlderThanFn) + : _service(std::move(service)), + _sessionsColl(std::move(collection)), + _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) { + _stats.setLastSessionsCollectionJobTimestamp(_service->now()); + _stats.setLastTransactionReaperJobTimestamp(_service->now()); + + if (!disableLogicalSessionCacheRefresh) { + _service->scheduleJob({"LogicalSessionCacheRefresh", + [this](Client* client) { _periodicRefresh(client); }, + Milliseconds(logicalSessionRefreshMillis)}); + + _service->scheduleJob({"LogicalSessionCacheReap", + [this](Client* client) { _periodicReap(client); }, + Milliseconds(logicalSessionRefreshMillis)}); + } +} + +LogicalSessionCacheImpl::~LogicalSessionCacheImpl() { + joinOnShutDown(); +} + +void LogicalSessionCacheImpl::joinOnShutDown() { + _service->join(); +} + +Status LogicalSessionCacheImpl::promote(const LogicalSessionId& lsid) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto it = _activeSessions.find(lsid); + if (it == _activeSessions.end()) { + return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; + } + + return Status::OK(); +} + +Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx, + const LogicalSessionRecord& record) { + return _addToCache(record); +} + +Status LogicalSessionCacheImpl::refreshSessions( + OperationContext* opCtx, const std::vector<LogicalSessionFromClient>& sessions) { + // Update the timestamps of all these records in our cache + for (const auto& lsid : makeLogicalSessionIds(sessions, opCtx)) { + if (!promote(lsid).isOK()) { + // This is a new record, insert it. + auto addToCacheStatus = + _addToCache(makeLogicalSessionRecord(opCtx, lsid, _service->now())); + if (!addToCacheStatus.isOK()) { + return addToCacheStatus; + } + } + } + + return Status::OK(); +} + +Status LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) { + if (!promote(lsid).isOK()) { + return startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, _service->now())); + } + return Status::OK(); +} + +Status LogicalSessionCacheImpl::refreshNow(Client* client) { + try { + _refresh(client); + } catch (...) { + return exceptionToStatus(); + } + return Status::OK(); +} + +Status LogicalSessionCacheImpl::reapNow(Client* client) { + return _reap(client); +} + +size_t LogicalSessionCacheImpl::size() { + stdx::lock_guard<stdx::mutex> lock(_mutex); + return _activeSessions.size(); +} + +void LogicalSessionCacheImpl::_periodicRefresh(Client* client) { + try { + _refresh(client); + } catch (...) { + log() << "Failed to refresh session cache: " << exceptionToStatus(); + } +} + +void LogicalSessionCacheImpl::_periodicReap(Client* client) { + auto res = _reap(client); + if (!res.isOK()) { + log() << "Failed to reap transaction table: " << res; + } + + return; +} + +Status LogicalSessionCacheImpl::_reap(Client* client) { + // Take the lock to update some stats. + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // Clear the last set of stats for our new run. + _stats.setLastTransactionReaperJobDurationMillis(0); + _stats.setLastTransactionReaperJobEntriesCleanedUp(0); + + // Start the new run. + _stats.setLastTransactionReaperJobTimestamp(_service->now()); + _stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1); + } + + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto* const opCtx = [&] { + if (client->getOperationContext()) { + return client->getOperationContext(); + } + + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); + + int numReaped = 0; + + try { + ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); }); + + auto existsStatus = _sessionsColl->checkSessionsCollectionExists(opCtx); + if (!existsStatus.isOK()) { + StringData notSetUpWarning = + "Sessions collection is not set up; " + "waiting until next sessions reap interval"; + if (existsStatus.code() != ErrorCodes::NamespaceNotFound || + existsStatus.code() != ErrorCodes::NamespaceNotSharded) { + log() << notSetUpWarning << ": " << existsStatus.reason(); + } else { + log() << notSetUpWarning; + } + + return Status::OK(); + } + + numReaped = _reapSessionsOlderThanFn(opCtx, + *_sessionsColl, + _service->now() - + Minutes(gTransactionRecordMinimumLifetimeMinutes)); + } catch (const DBException& ex) { + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto millis = _service->now() - _stats.getLastTransactionReaperJobTimestamp(); + _stats.setLastTransactionReaperJobDurationMillis(millis.count()); + } + + return ex.toStatus(); + } + + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto millis = _service->now() - _stats.getLastTransactionReaperJobTimestamp(); + _stats.setLastTransactionReaperJobDurationMillis(millis.count()); + _stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped); + } + + return Status::OK(); +} + +void LogicalSessionCacheImpl::_refresh(Client* client) { + // Stats for serverStatus: + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // Clear the refresh-related stats with the beginning of our run. + _stats.setLastSessionsCollectionJobDurationMillis(0); + _stats.setLastSessionsCollectionJobEntriesRefreshed(0); + _stats.setLastSessionsCollectionJobEntriesEnded(0); + _stats.setLastSessionsCollectionJobCursorsClosed(0); + + // Start the new run. + _stats.setLastSessionsCollectionJobTimestamp(_service->now()); + _stats.setSessionsCollectionJobCount(_stats.getSessionsCollectionJobCount() + 1); + } + + // This will finish timing _refresh for our stats no matter when we return. + const auto timeRefreshJob = makeGuard([this] { + stdx::lock_guard<stdx::mutex> lk(_mutex); + auto millis = _service->now() - _stats.getLastSessionsCollectionJobTimestamp(); + _stats.setLastSessionsCollectionJobDurationMillis(millis.count()); + }); + + // get or make an opCtx + boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx; + auto* const opCtx = [&client, &uniqueCtx] { + if (client->getOperationContext()) { + return client->getOperationContext(); + } + + uniqueCtx.emplace(client->makeOperationContext()); + return uniqueCtx->get(); + }(); + + ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); }); + + auto setupStatus = _sessionsColl->setupSessionsCollection(opCtx); + + if (!setupStatus.isOK()) { + log() << "Sessions collection is not set up; " + << "waiting until next sessions refresh interval: " << setupStatus.reason(); + return; + } + + LogicalSessionIdSet staleSessions; + LogicalSessionIdSet explicitlyEndingSessions; + LogicalSessionIdMap<LogicalSessionRecord> activeSessions; + + { + using std::swap; + stdx::lock_guard<stdx::mutex> lk(_mutex); + swap(explicitlyEndingSessions, _endingSessions); + swap(activeSessions, _activeSessions); + } + + // Create guards that in the case of a exception replace the ending or active sessions that + // swapped out of LogicalSessionCache, and merges in any records that had been added since we + // swapped them out. + auto backSwap = [this](auto& member, auto& temp) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + using std::swap; + swap(member, temp); + for (const auto& it : temp) { + member.emplace(it); + } + }; + auto activeSessionsBackSwapper = makeGuard([&] { backSwap(_activeSessions, activeSessions); }); + auto explicitlyEndingBackSwaper = + makeGuard([&] { backSwap(_endingSessions, explicitlyEndingSessions); }); + + // remove all explicitlyEndingSessions from activeSessions + for (const auto& lsid : explicitlyEndingSessions) { + activeSessions.erase(lsid); + } + + // Refresh all recently active sessions as well as for sessions attached to running ops + LogicalSessionRecordSet activeSessionRecords; + + auto runningOpSessions = _service->getActiveOpSessions(); + + for (const auto& it : runningOpSessions) { + // if a running op is the cause of an upsert, we won't have a user name for the record + if (explicitlyEndingSessions.count(it) > 0) { + continue; + } + activeSessionRecords.insert(makeLogicalSessionRecord(it, _service->now())); + } + for (const auto& it : activeSessions) { + activeSessionRecords.insert(it.second); + } + + // Refresh the active sessions in the sessions collection. + uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords)); + activeSessionsBackSwapper.dismiss(); + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size()); + } + + // Remove the ending sessions from the sessions collection. + uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions)); + explicitlyEndingBackSwaper.dismiss(); + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size()); + } + + // Find which running, but not recently active sessions, are expired, and add them + // to the list of sessions to kill cursors for + + KillAllSessionsByPatternSet patterns; + + auto openCursorSessions = _service->getOpenCursorSessions(opCtx); + // Exclude sessions added to _activeSessions from the openCursorSession to avoid race between + // killing cursors on the removed sessions and creating sessions. + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + for (const auto& it : _activeSessions) { + auto newSessionIt = openCursorSessions.find(it.first); + if (newSessionIt != openCursorSessions.end()) { + openCursorSessions.erase(newSessionIt); + } + } + } + + // think about pruning ending and active out of openCursorSessions + auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions); + + if (statusAndRemovedSessions.isOK()) { + auto removedSessions = statusAndRemovedSessions.getValue(); + for (const auto& lsid : removedSessions) { + patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); + } + } else { + // Ignore errors. + } + + // Add all of the explicitly ended sessions to the list of sessions to kill cursors for. + for (const auto& lsid : explicitlyEndingSessions) { + patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid)); + } + + SessionKiller::Matcher matcher(std::move(patterns)); + auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)); + { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _stats.setLastSessionsCollectionJobCursorsClosed(killRes.second); + } +} + +void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _endingSessions.insert(begin(sessions), end(sessions)); +} + +LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _stats.setActiveSessionsCount(_activeSessions.size()); + return _stats; +} + +Status LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + if (_activeSessions.size() >= static_cast<size_t>(maxSessions)) { + return {ErrorCodes::TooManyLogicalSessions, "cannot add session into the cache"}; + } + + _activeSessions.insert(std::make_pair(record.getId(), record)); + return Status::OK(); +} + +std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + std::vector<LogicalSessionId> ret; + ret.reserve(_activeSessions.size()); + for (const auto& id : _activeSessions) { + ret.push_back(id.first); + } + return ret; +} + +std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds( + const std::vector<SHA256Block>& userDigests) const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + std::vector<LogicalSessionId> ret; + for (const auto& it : _activeSessions) { + if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) != + userDigests.cend()) { + ret.push_back(it.first); + } + } + return ret; +} + +boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached( + const LogicalSessionId& id) const { + stdx::lock_guard<stdx::mutex> lk(_mutex); + const auto it = _activeSessions.find(id); + if (it == _activeSessions.end()) { + return boost::none; + } + return it->second; +} + +} // namespace mongo diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h index 96c7f80d717..956a7924023 100644 --- a/src/mongo/db/logical_session_cache_impl.h +++ b/src/mongo/db/logical_session_cache_impl.h @@ -66,14 +66,12 @@ public: void joinOnShutDown() override; - Status promote(LogicalSessionId lsid) override; + Status promote(const LogicalSessionId& lsid) override; - Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override; + Status startSession(OperationContext* opCtx, const LogicalSessionRecord& record) override; Status refreshSessions(OperationContext* opCtx, - const RefreshSessionsCmdFromClient& cmd) override; - Status refreshSessions(OperationContext* opCtx, - const RefreshSessionsCmdFromClusterMember& cmd) override; + const std::vector<LogicalSessionFromClient>& sessions) override; Status vivify(OperationContext* opCtx, const LogicalSessionId& lsid) override; @@ -81,8 +79,6 @@ public: Status reapNow(Client* client) override; - Date_t now() override; - size_t size() override; std::vector<LogicalSessionId> listIds() const override; diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h index 8b5bb312102..3c7b860c355 100644 --- a/src/mongo/db/logical_session_cache_noop.h +++ b/src/mongo/db/logical_session_cache_noop.h @@ -44,20 +44,16 @@ class LogicalSessionCacheNoop : public LogicalSessionCache { public: void joinOnShutDown() override {} - Status promote(LogicalSessionId lsid) override { + Status promote(const LogicalSessionId& lsid) override { return Status::OK(); } - Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override { + Status startSession(OperationContext* opCtx, const LogicalSessionRecord& record) override { return Status::OK(); } Status refreshSessions(OperationContext* opCtx, - const RefreshSessionsCmdFromClient& cmd) override { - return Status::OK(); - } - Status refreshSessions(OperationContext* opCtx, - const RefreshSessionsCmdFromClusterMember& cmd) override { + const std::vector<LogicalSessionFromClient>& sessions) override { return Status::OK(); } @@ -73,10 +69,6 @@ public: return Status::OK(); } - Date_t now() override { - return Date_t::now(); - } - size_t size() override { return 0; } diff --git a/src/mongo/db/refresh_sessions.idl b/src/mongo/db/refresh_sessions.idl deleted file mode 100644 index 6bb9de2f7c5..00000000000 --- a/src/mongo/db/refresh_sessions.idl +++ /dev/null @@ -1,52 +0,0 @@ -# Copyright (C) 2018-present MongoDB, Inc. -# -# This program is free software: you can redistribute it and/or modify -# it under the terms of the Server Side Public License, version 1, -# as published by MongoDB, Inc. -# -# This program is distributed in the hope that it will be useful, -# but WITHOUT ANY WARRANTY; without even the implied warranty of -# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -# Server Side Public License for more details. -# -# You should have received a copy of the Server Side Public License -# along with this program. If not, see -# <http://www.mongodb.com/licensing/server-side-public-license>. -# -# As a special exception, the copyright holders give permission to link the -# code of portions of this program with the OpenSSL library under certain -# conditions as described in each individual source file and distribute -# linked combinations including the program with the OpenSSL library. You -# must comply with the Server Side Public License in all respects for -# all of the code used other than as permitted herein. If you modify file(s) -# with this exception, you may extend this exception to your version of the -# file(s), but you are not obligated to do so. If you do not wish to do so, -# delete this exception statement from your version. If you delete this -# exception statement from all source files in the program, then also delete -# it in the license file. -# - -# This IDL file describes the BSON format for a LogicalSessionId, and -# handles the serialization to and deserialization from its BSON representation -# for that class. - -global: - cpp_namespace: "mongo" - -imports: - - "mongo/idl/basic_types.idl" - - "mongo/db/logical_session_id.idl" - -structs: - - RefreshSessionsCmdFromClient: - description: "A struct representing a refreshSessions command from a client" - strict: false - fields: - refreshSessions: array<LogicalSessionFromClient> - - RefreshSessionsCmdFromClusterMember: - description: "A struct representing a refreshSessions command from a cluster member" - strict: false - fields: - refreshSessionsInternal: array<LogicalSessionRecord> diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index 6421302ab28..37e1749fefe 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -39,7 +39,6 @@ #include "mongo/db/create_indexes_gen.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/ops/write_ops.h" -#include "mongo/db/refresh_sessions_gen.h" #include "mongo/rpc/get_status_from_command_result.h" #include "mongo/s/write_ops/batched_command_response.h" #include "mongo/stdx/functional.h" diff --git a/src/mongo/s/commands/cluster_create_cmd.cpp b/src/mongo/s/commands/cluster_create_cmd.cpp index a87b2522687..a5c71dcb1e0 100644 --- a/src/mongo/s/commands/cluster_create_cmd.cpp +++ b/src/mongo/s/commands/cluster_create_cmd.cpp @@ -92,14 +92,14 @@ public: configCreateCmd.setOptions(optionsBuilder.obj()); } - auto response = - Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts( - opCtx, - ReadPreferenceSetting{ReadPreference::PrimaryOnly}, - "admin", - CommandHelpers::appendMajorityWriteConcern( - CommandHelpers::appendPassthroughFields(cmdObj, configCreateCmd.toBSON({}))), - Shard::RetryPolicy::kIdempotent); + const auto shardRegistry = Grid::get(opCtx)->shardRegistry(); + auto response = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts( + opCtx, + ReadPreferenceSetting{ReadPreference::PrimaryOnly}, + "admin", + CommandHelpers::appendMajorityWriteConcern( + CommandHelpers::appendPassthroughFields(cmdObj, configCreateCmd.toBSON({}))), + Shard::RetryPolicy::kIdempotent); uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(response)); return true; |