summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-16 17:42:25 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2019-05-21 17:42:12 -0400
commit65656c4bc9390c52205b73451abff0b7b6b74396 (patch)
tree6892a9719c44c38af42a6ecabc0984a78fe753cb /src
parent107dd43e99f191c541c0c0faab40ff0d9bc47550 (diff)
downloadmongo-65656c4bc9390c52205b73451abff0b7b6b74396.tar.gz
SERVER-41193 Get rid of the `refreshSessionsInternal` command
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/SConscript1
-rw-r--r--src/mongo/db/commands/SConscript3
-rw-r--r--src/mongo/db/commands/end_sessions_command.cpp13
-rw-r--r--src/mongo/db/commands/refresh_sessions_command.cpp11
-rw-r--r--src/mongo/db/commands/refresh_sessions_command_internal.cpp89
-rw-r--r--src/mongo/db/commands/sessions_commands.idl (renamed from src/mongo/db/commands/end_sessions.idl)12
-rw-r--r--src/mongo/db/commands/start_session_command.cpp29
-rw-r--r--src/mongo/db/logical_session_cache.h30
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp68
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp-c26ee759435
-rw-r--r--src/mongo/db/logical_session_cache_impl.h10
-rw-r--r--src/mongo/db/logical_session_cache_noop.h14
-rw-r--r--src/mongo/db/refresh_sessions.idl52
-rw-r--r--src/mongo/db/sessions_collection.cpp1
-rw-r--r--src/mongo/s/commands/cluster_create_cmd.cpp16
15 files changed, 527 insertions, 257 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index de3ecb0da52..8d789440a4f 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -1319,7 +1319,6 @@ env.Library(
source=[
'logical_session_id.cpp',
env.Idlc('logical_session_id.idl')[0],
- env.Idlc('refresh_sessions.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/base',
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 2d1be53c64f..ef2e9fa712c 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -106,8 +106,8 @@ env.Library(
'refresh_sessions_command.cpp',
'rename_collection_common.cpp',
'start_session_command.cpp',
- env.Idlc('end_sessions.idl')[0],
env.Idlc('parameters.idl')[0],
+ env.Idlc('sessions_commands.idl')[0],
],
LIBDEPS_PRIVATE=[
'$BUILD_DIR/mongo/bson/mutable/mutable_bson',
@@ -143,7 +143,6 @@ env.Library(
'logical_session_server_status_section.cpp',
'mr_common.cpp',
'reap_logical_session_cache_now.cpp',
- 'refresh_sessions_command_internal.cpp',
'traffic_recording_cmds.cpp',
'user_management_commands_common.cpp',
env.Idlc('drop_connections.idl')[0],
diff --git a/src/mongo/db/commands/end_sessions_command.cpp b/src/mongo/db/commands/end_sessions_command.cpp
index b8325015cf1..15c19f907d8 100644
--- a/src/mongo/db/commands/end_sessions_command.cpp
+++ b/src/mongo/db/commands/end_sessions_command.cpp
@@ -33,7 +33,7 @@
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
-#include "mongo/db/commands/end_sessions_gen.h"
+#include "mongo/db/commands/sessions_commands_gen.h"
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
@@ -51,15 +51,19 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kAlways;
}
+
bool adminOnly() const override {
return false;
}
+
bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
+
std::string help() const override {
return "end a set of logical sessions";
}
+
Status checkAuthForOperation(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj) const override {
@@ -79,11 +83,12 @@ public:
const std::string& db,
const BSONObj& cmdObj,
BSONObjBuilder& result) override {
- auto lsCache = LogicalSessionCache::get(opCtx);
+ auto endSessionsRequest = EndSessionsCmdFromClient::parse(
+ IDLParserErrorContext("EndSessionsCmdFromClient"), cmdObj);
- auto cmd = EndSessionsCmdFromClient::parse("EndSessionsCmdFromClient"_sd, cmdObj);
+ LogicalSessionCache::get(opCtx)->endSessions(
+ makeLogicalSessionIds(endSessionsRequest.getSessions(), opCtx));
- lsCache->endSessions(makeLogicalSessionIds(cmd.getEndSessions(), opCtx));
return true;
}
diff --git a/src/mongo/db/commands/refresh_sessions_command.cpp b/src/mongo/db/commands/refresh_sessions_command.cpp
index 8e2c663e6cc..a74ddc04589 100644
--- a/src/mongo/db/commands/refresh_sessions_command.cpp
+++ b/src/mongo/db/commands/refresh_sessions_command.cpp
@@ -29,15 +29,14 @@
#include "mongo/platform/basic.h"
-#include "mongo/base/init.h"
#include "mongo/db/auth/authorization_session.h"
#include "mongo/db/client.h"
#include "mongo/db/commands.h"
+#include "mongo/db/commands/sessions_commands_gen.h"
#include "mongo/db/jsobj.h"
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
-#include "mongo/db/refresh_sessions_gen.h"
namespace mongo {
namespace {
@@ -81,9 +80,11 @@ public:
const std::string& db,
const BSONObj& cmdObj,
BSONObjBuilder& result) override {
- IDLParserErrorContext ctx("RefreshSessionsCmdFromClient");
- auto cmd = RefreshSessionsCmdFromClient::parse(ctx, cmdObj);
- uassertStatusOK(LogicalSessionCache::get(opCtx)->refreshSessions(opCtx, cmd));
+ auto refreshSessionsRequest = RefreshSessionsCmdFromClient::parse(
+ IDLParserErrorContext("RefreshSessionsCmdFromClient"), cmdObj);
+
+ uassertStatusOK(LogicalSessionCache::get(opCtx)->refreshSessions(
+ opCtx, refreshSessionsRequest.getSessions()));
return true;
}
diff --git a/src/mongo/db/commands/refresh_sessions_command_internal.cpp b/src/mongo/db/commands/refresh_sessions_command_internal.cpp
deleted file mode 100644
index aa3ebb4b194..00000000000
--- a/src/mongo/db/commands/refresh_sessions_command_internal.cpp
+++ /dev/null
@@ -1,89 +0,0 @@
-/**
- * Copyright (C) 2018-present MongoDB, Inc.
- *
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the Server Side Public License, version 1,
- * as published by MongoDB, Inc.
- *
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * Server Side Public License for more details.
- *
- * You should have received a copy of the Server Side Public License
- * along with this program. If not, see
- * <http://www.mongodb.com/licensing/server-side-public-license>.
- *
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the Server Side Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
- */
-
-#include "mongo/platform/basic.h"
-
-#include "mongo/base/init.h"
-#include "mongo/db/auth/authorization_session.h"
-#include "mongo/db/client.h"
-#include "mongo/db/commands.h"
-#include "mongo/db/jsobj.h"
-#include "mongo/db/logical_session_cache.h"
-#include "mongo/db/logical_session_id_helpers.h"
-#include "mongo/db/operation_context.h"
-#include "mongo/db/refresh_sessions_gen.h"
-
-namespace mongo {
-
-class RefreshSessionsCommandInternal final : public BasicCommand {
- RefreshSessionsCommandInternal(const RefreshSessionsCommandInternal&) = delete;
- RefreshSessionsCommandInternal& operator=(const RefreshSessionsCommandInternal&) = delete;
-
-public:
- RefreshSessionsCommandInternal() : BasicCommand("refreshSessionsInternal") {}
-
- AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
- return AllowedOnSecondary::kAlways;
- }
- bool adminOnly() const override {
- return false;
- }
- bool supportsWriteConcern(const BSONObj& cmd) const override {
- return false;
- }
- std::string help() const override {
- return "renew a set of logical sessions";
- }
- Status checkAuthForOperation(OperationContext* opCtx,
- const std::string& dbname,
- const BSONObj& cmdObj) const override {
- // Must be authenticated as an internal cluster member.
- auto authSession = AuthorizationSession::get(opCtx->getClient());
- if (!authSession->isAuthorizedForPrivilege(
- Privilege(ResourcePattern::forClusterResource(), ActionType::impersonate))) {
- return {ErrorCodes::Unauthorized, "unauthorized"};
- }
- return Status::OK();
- }
-
- virtual bool run(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
- IDLParserErrorContext ctx("RefreshSessionsCmdFromClusterMember");
- auto cmd = RefreshSessionsCmdFromClusterMember::parse(ctx, cmdObj);
- auto res =
- LogicalSessionCache::get(opCtx->getServiceContext())->refreshSessions(opCtx, cmd);
- uassertStatusOK(res);
-
- return true;
- }
-} refreshSessionsCommandInternal;
-
-} // namespace mongo
diff --git a/src/mongo/db/commands/end_sessions.idl b/src/mongo/db/commands/sessions_commands.idl
index 2dd7cfceffb..706537fff79 100644
--- a/src/mongo/db/commands/end_sessions.idl
+++ b/src/mongo/db/commands/sessions_commands.idl
@@ -38,4 +38,14 @@ structs:
description: "A struct representing an endSessions command from a client"
strict: false
fields:
- endSessions: array<LogicalSessionFromClient>
+ endSessions:
+ type: array<LogicalSessionFromClient>
+ cpp_name: sessions
+
+ RefreshSessionsCmdFromClient:
+ description: "A struct representing a refreshSessions command from a client"
+ strict: false
+ fields:
+ refreshSessions:
+ type: array<LogicalSessionFromClient>
+ cpp_name: sessions
diff --git a/src/mongo/db/commands/start_session_command.cpp b/src/mongo/db/commands/start_session_command.cpp
index a9e64c9d5c4..02293532a34 100644
--- a/src/mongo/db/commands/start_session_command.cpp
+++ b/src/mongo/db/commands/start_session_command.cpp
@@ -45,6 +45,7 @@
#include "mongo/db/stats/top.h"
namespace mongo {
+namespace {
class StartSessionCommand final : public BasicCommand {
StartSessionCommand(const StartSessionCommand&) = delete;
@@ -56,37 +57,43 @@ public:
AllowedOnSecondary secondaryAllowed(ServiceContext*) const override {
return AllowedOnSecondary::kAlways;
}
+
bool adminOnly() const override {
return false;
}
+
bool supportsWriteConcern(const BSONObj& cmd) const override {
return false;
}
+
std::string help() const override {
return "start a logical session";
}
+
Status checkAuthForOperation(OperationContext* opCtx,
const std::string& dbname,
const BSONObj& cmdObj) const override {
return Status::OK();
}
- virtual bool run(OperationContext* opCtx,
- const std::string& db,
- const BSONObj& cmdObj,
- BSONObjBuilder& result) override {
- auto client = opCtx->getClient();
- ServiceContext* serviceContext = client->getServiceContext();
+ bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
+ const auto service = opCtx->getServiceContext();
+ const auto lsCache = LogicalSessionCache::get(service);
+
+ auto newSessionRecord =
+ makeLogicalSessionRecord(opCtx, service->getFastClockSource()->now());
- auto lsCache = LogicalSessionCache::get(serviceContext);
- boost::optional<LogicalSessionRecord> record =
- makeLogicalSessionRecord(opCtx, lsCache->now());
- uassertStatusOK(lsCache->startSession(opCtx, record.get()));
+ uassertStatusOK(lsCache->startSession(opCtx, newSessionRecord));
- makeLogicalSessionToClient(record->getId()).serialize(&result);
+ makeLogicalSessionToClient(newSessionRecord.getId()).serialize(&result);
return true;
}
+
} startSessionCommand;
+} // namespace
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index e035ee13d7f..38f323eeda5 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -35,7 +35,6 @@
#include "mongo/db/logical_session_cache_gen.h"
#include "mongo/db/logical_session_cache_stats_gen.h"
#include "mongo/db/logical_session_id.h"
-#include "mongo/db/refresh_sessions_gen.h"
namespace mongo {
@@ -67,24 +66,22 @@ public:
* to be the most recently used and updates its lastUse date to be the current
* time. Returns an error if the session was not found.
*/
- virtual Status promote(LogicalSessionId lsid) = 0;
+ virtual Status promote(const LogicalSessionId& lsid) = 0;
/**
- * Inserts a new authoritative session record into the cache. This method will
- * insert the authoritative record into the sessions collection. This method
- * should only be used when starting new sessions and should not be used to
- * insert records for existing sessions.
+ * Inserts a new authoritative session record into the cache.
+ *
+ * This method will insert the authoritative record into the sessions collection and should only
+ * be used when starting new sessions. It should not be used to insert records for existing
+ * sessions.
*/
- virtual Status startSession(OperationContext* opCtx, LogicalSessionRecord record) = 0;
+ virtual Status startSession(OperationContext* opCtx, const LogicalSessionRecord& record) = 0;
/**
- * Refresh the given sessions. Updates the timestamps of these records in
- * the local cache.
+ * Refresh the given sessions. Updates the timestamps of these records in the local cache.
*/
virtual Status refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClient& cmd) = 0;
- virtual Status refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClusterMember& cmd) = 0;
+ const std::vector<LogicalSessionFromClient>& sessions) = 0;
/**
* Vivifies the session in the cache. I.e. creates it if it isn't there, updates last use if it
@@ -98,8 +95,8 @@ public:
virtual void endSessions(const LogicalSessionIdSet& lsids) = 0;
/**
- * Refreshes the cache synchronously. This flushes all pending refreshes and
- * inserts to the sessions collection.
+ * Refreshes the cache synchronously. This flushes all pending refreshes and inserts to the
+ * sessions collection.
*/
virtual Status refreshNow(Client* client) = 0;
@@ -109,11 +106,6 @@ public:
virtual Status reapNow(Client* client) = 0;
/**
- * Returns the current time.
- */
- virtual Date_t now() = 0;
-
- /**
* Returns the number of session records currently in the cache.
*/
virtual size_t size() = 0;
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index 2095595eb5f..c9dd14236b1 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -63,8 +63,8 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison>
: _service(std::move(service)),
_sessionsColl(std::move(collection)),
_reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) {
- _stats.setLastSessionsCollectionJobTimestamp(now());
- _stats.setLastTransactionReaperJobTimestamp(now());
+ _stats.setLastSessionsCollectionJobTimestamp(_service->now());
+ _stats.setLastTransactionReaperJobTimestamp(_service->now());
if (!disableLogicalSessionCacheRefresh) {
_service->scheduleJob({"LogicalSessionCacheRefresh",
@@ -85,7 +85,7 @@ void LogicalSessionCacheImpl::joinOnShutDown() {
_service->join();
}
-Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) {
+Status LogicalSessionCacheImpl::promote(const LogicalSessionId& lsid) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
auto it = _activeSessions.find(lsid);
if (it == _activeSessions.end()) {
@@ -95,38 +95,19 @@ Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) {
return Status::OK();
}
-Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) {
- // Add the new record to our local cache. We will insert it into the sessions collection
- // the next time _refresh is called. If there is already a record in the cache for this
- // session, we'll just write over it with our newer, more recent one.
+Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx,
+ const LogicalSessionRecord& record) {
return _addToCache(record);
}
-Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClient& cmd) {
- // Update the timestamps of all these records in our cache.
- auto sessions = makeLogicalSessionIds(cmd.getRefreshSessions(), opCtx);
- for (const auto& lsid : sessions) {
+Status LogicalSessionCacheImpl::refreshSessions(
+ OperationContext* opCtx, const std::vector<LogicalSessionFromClient>& sessions) {
+ // Update the timestamps of all these records in our cache
+ for (const auto& lsid : makeLogicalSessionIds(sessions, opCtx)) {
if (!promote(lsid).isOK()) {
// This is a new record, insert it.
- auto addToCacheStatus = _addToCache(makeLogicalSessionRecord(opCtx, lsid, now()));
- if (!addToCacheStatus.isOK()) {
- return addToCacheStatus;
- }
- }
- }
-
- return Status::OK();
-}
-
-Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClusterMember& cmd) {
- // Update the timestamps of all these records in our cache.
- auto records = cmd.getRefreshSessionsInternal();
- for (const auto& record : records) {
- if (!promote(record.getId()).isOK()) {
- // This is a new record, insert it.
- auto addToCacheStatus = _addToCache(record);
+ auto addToCacheStatus =
+ _addToCache(makeLogicalSessionRecord(opCtx, lsid, _service->now()));
if (!addToCacheStatus.isOK()) {
return addToCacheStatus;
}
@@ -138,7 +119,7 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx,
Status LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) {
if (!promote(lsid).isOK()) {
- return startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now()));
+ return startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, _service->now()));
}
return Status::OK();
}
@@ -156,10 +137,6 @@ Status LogicalSessionCacheImpl::reapNow(Client* client) {
return _reap(client);
}
-Date_t LogicalSessionCacheImpl::now() {
- return _service->now();
-}
-
size_t LogicalSessionCacheImpl::size() {
stdx::lock_guard<stdx::mutex> lock(_mutex);
return _activeSessions.size();
@@ -192,7 +169,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
_stats.setLastTransactionReaperJobEntriesCleanedUp(0);
// Start the new run.
- _stats.setLastTransactionReaperJobTimestamp(now());
+ _stats.setLastTransactionReaperJobTimestamp(_service->now());
_stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1);
}
@@ -226,15 +203,14 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return Status::OK();
}
- numReaped =
- _reapSessionsOlderThanFn(opCtx,
- *_sessionsColl,
- opCtx->getServiceContext()->getFastClockSource()->now() -
- Minutes(gTransactionRecordMinimumLifetimeMinutes));
+ numReaped = _reapSessionsOlderThanFn(opCtx,
+ *_sessionsColl,
+ _service->now() -
+ Minutes(gTransactionRecordMinimumLifetimeMinutes));
} catch (const DBException& ex) {
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
- auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
+ auto millis = _service->now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
}
@@ -243,7 +219,7 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
{
stdx::lock_guard<stdx::mutex> lk(_mutex);
- auto millis = now() - _stats.getLastTransactionReaperJobTimestamp();
+ auto millis = _service->now() - _stats.getLastTransactionReaperJobTimestamp();
_stats.setLastTransactionReaperJobDurationMillis(millis.count());
_stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped);
}
@@ -263,14 +239,14 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
_stats.setLastSessionsCollectionJobCursorsClosed(0);
// Start the new run.
- _stats.setLastSessionsCollectionJobTimestamp(now());
+ _stats.setLastSessionsCollectionJobTimestamp(_service->now());
_stats.setSessionsCollectionJobCount(_stats.getSessionsCollectionJobCount() + 1);
}
// This will finish timing _refresh for our stats no matter when we return.
const auto timeRefreshJob = makeGuard([this] {
stdx::lock_guard<stdx::mutex> lk(_mutex);
- auto millis = now() - _stats.getLastSessionsCollectionJobTimestamp();
+ auto millis = _service->now() - _stats.getLastSessionsCollectionJobTimestamp();
_stats.setLastSessionsCollectionJobDurationMillis(millis.count());
});
@@ -336,7 +312,7 @@ void LogicalSessionCacheImpl::_refresh(Client* client) {
if (explicitlyEndingSessions.count(it) > 0) {
continue;
}
- activeSessionRecords.insert(makeLogicalSessionRecord(it, now()));
+ activeSessionRecords.insert(makeLogicalSessionRecord(it, _service->now()));
}
for (const auto& it : activeSessions) {
activeSessionRecords.insert(it.second);
diff --git a/src/mongo/db/logical_session_cache_impl.cpp-c26ee759 b/src/mongo/db/logical_session_cache_impl.cpp-c26ee759
new file mode 100644
index 00000000000..c9dd14236b1
--- /dev/null
+++ b/src/mongo/db/logical_session_cache_impl.cpp-c26ee759
@@ -0,0 +1,435 @@
+/**
+ * Copyright (C) 2018-present MongoDB, Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the Server Side Public License, version 1,
+ * as published by MongoDB, Inc.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * Server Side Public License for more details.
+ *
+ * You should have received a copy of the Server Side Public License
+ * along with this program. If not, see
+ * <http://www.mongodb.com/licensing/server-side-public-license>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the Server Side Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#define MONGO_LOG_DEFAULT_COMPONENT ::mongo::logger::LogComponent::kControl
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/logical_session_cache_impl.h"
+
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/s/operation_sharding_state.h"
+#include "mongo/db/service_context.h"
+#include "mongo/platform/atomic_word.h"
+#include "mongo/util/duration.h"
+#include "mongo/util/log.h"
+#include "mongo/util/periodic_runner.h"
+#include "mongo/util/scopeguard.h"
+
+namespace mongo {
+
+namespace {
+
+void clearShardingOperationFailedStatus(OperationContext* opCtx) {
+ // We do not intend to immediately act upon sharding errors if we receive them during sessions
+ // collection operations. We will instead attempt the same operations during the next refresh
+ // cycle.
+ OperationShardingState::get(opCtx).resetShardingOperationFailedStatus();
+}
+
+} // namespace
+
+LogicalSessionCacheImpl::LogicalSessionCacheImpl(std::unique_ptr<ServiceLiaison> service,
+ std::shared_ptr<SessionsCollection> collection,
+ ReapSessionsOlderThanFn reapSessionsOlderThanFn)
+ : _service(std::move(service)),
+ _sessionsColl(std::move(collection)),
+ _reapSessionsOlderThanFn(std::move(reapSessionsOlderThanFn)) {
+ _stats.setLastSessionsCollectionJobTimestamp(_service->now());
+ _stats.setLastTransactionReaperJobTimestamp(_service->now());
+
+ if (!disableLogicalSessionCacheRefresh) {
+ _service->scheduleJob({"LogicalSessionCacheRefresh",
+ [this](Client* client) { _periodicRefresh(client); },
+ Milliseconds(logicalSessionRefreshMillis)});
+
+ _service->scheduleJob({"LogicalSessionCacheReap",
+ [this](Client* client) { _periodicReap(client); },
+ Milliseconds(logicalSessionRefreshMillis)});
+ }
+}
+
+LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
+ joinOnShutDown();
+}
+
+void LogicalSessionCacheImpl::joinOnShutDown() {
+ _service->join();
+}
+
+Status LogicalSessionCacheImpl::promote(const LogicalSessionId& lsid) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto it = _activeSessions.find(lsid);
+ if (it == _activeSessions.end()) {
+ return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
+ }
+
+ return Status::OK();
+}
+
+Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx,
+ const LogicalSessionRecord& record) {
+ return _addToCache(record);
+}
+
+Status LogicalSessionCacheImpl::refreshSessions(
+ OperationContext* opCtx, const std::vector<LogicalSessionFromClient>& sessions) {
+ // Update the timestamps of all these records in our cache
+ for (const auto& lsid : makeLogicalSessionIds(sessions, opCtx)) {
+ if (!promote(lsid).isOK()) {
+ // This is a new record, insert it.
+ auto addToCacheStatus =
+ _addToCache(makeLogicalSessionRecord(opCtx, lsid, _service->now()));
+ if (!addToCacheStatus.isOK()) {
+ return addToCacheStatus;
+ }
+ }
+ }
+
+ return Status::OK();
+}
+
+Status LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) {
+ if (!promote(lsid).isOK()) {
+ return startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, _service->now()));
+ }
+ return Status::OK();
+}
+
+Status LogicalSessionCacheImpl::refreshNow(Client* client) {
+ try {
+ _refresh(client);
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ return Status::OK();
+}
+
+Status LogicalSessionCacheImpl::reapNow(Client* client) {
+ return _reap(client);
+}
+
+size_t LogicalSessionCacheImpl::size() {
+ stdx::lock_guard<stdx::mutex> lock(_mutex);
+ return _activeSessions.size();
+}
+
+void LogicalSessionCacheImpl::_periodicRefresh(Client* client) {
+ try {
+ _refresh(client);
+ } catch (...) {
+ log() << "Failed to refresh session cache: " << exceptionToStatus();
+ }
+}
+
+void LogicalSessionCacheImpl::_periodicReap(Client* client) {
+ auto res = _reap(client);
+ if (!res.isOK()) {
+ log() << "Failed to reap transaction table: " << res;
+ }
+
+ return;
+}
+
+Status LogicalSessionCacheImpl::_reap(Client* client) {
+ // Take the lock to update some stats.
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // Clear the last set of stats for our new run.
+ _stats.setLastTransactionReaperJobDurationMillis(0);
+ _stats.setLastTransactionReaperJobEntriesCleanedUp(0);
+
+ // Start the new run.
+ _stats.setLastTransactionReaperJobTimestamp(_service->now());
+ _stats.setTransactionReaperJobCount(_stats.getTransactionReaperJobCount() + 1);
+ }
+
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
+
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
+
+ int numReaped = 0;
+
+ try {
+ ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); });
+
+ auto existsStatus = _sessionsColl->checkSessionsCollectionExists(opCtx);
+ if (!existsStatus.isOK()) {
+ StringData notSetUpWarning =
+ "Sessions collection is not set up; "
+ "waiting until next sessions reap interval";
+ if (existsStatus.code() != ErrorCodes::NamespaceNotFound ||
+ existsStatus.code() != ErrorCodes::NamespaceNotSharded) {
+ log() << notSetUpWarning << ": " << existsStatus.reason();
+ } else {
+ log() << notSetUpWarning;
+ }
+
+ return Status::OK();
+ }
+
+ numReaped = _reapSessionsOlderThanFn(opCtx,
+ *_sessionsColl,
+ _service->now() -
+ Minutes(gTransactionRecordMinimumLifetimeMinutes));
+ } catch (const DBException& ex) {
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto millis = _service->now() - _stats.getLastTransactionReaperJobTimestamp();
+ _stats.setLastTransactionReaperJobDurationMillis(millis.count());
+ }
+
+ return ex.toStatus();
+ }
+
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto millis = _service->now() - _stats.getLastTransactionReaperJobTimestamp();
+ _stats.setLastTransactionReaperJobDurationMillis(millis.count());
+ _stats.setLastTransactionReaperJobEntriesCleanedUp(numReaped);
+ }
+
+ return Status::OK();
+}
+
+void LogicalSessionCacheImpl::_refresh(Client* client) {
+ // Stats for serverStatus:
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // Clear the refresh-related stats with the beginning of our run.
+ _stats.setLastSessionsCollectionJobDurationMillis(0);
+ _stats.setLastSessionsCollectionJobEntriesRefreshed(0);
+ _stats.setLastSessionsCollectionJobEntriesEnded(0);
+ _stats.setLastSessionsCollectionJobCursorsClosed(0);
+
+ // Start the new run.
+ _stats.setLastSessionsCollectionJobTimestamp(_service->now());
+ _stats.setSessionsCollectionJobCount(_stats.getSessionsCollectionJobCount() + 1);
+ }
+
+ // This will finish timing _refresh for our stats no matter when we return.
+ const auto timeRefreshJob = makeGuard([this] {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ auto millis = _service->now() - _stats.getLastSessionsCollectionJobTimestamp();
+ _stats.setLastSessionsCollectionJobDurationMillis(millis.count());
+ });
+
+ // get or make an opCtx
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&client, &uniqueCtx] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
+
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
+
+ ON_BLOCK_EXIT([&opCtx] { clearShardingOperationFailedStatus(opCtx); });
+
+ auto setupStatus = _sessionsColl->setupSessionsCollection(opCtx);
+
+ if (!setupStatus.isOK()) {
+ log() << "Sessions collection is not set up; "
+ << "waiting until next sessions refresh interval: " << setupStatus.reason();
+ return;
+ }
+
+ LogicalSessionIdSet staleSessions;
+ LogicalSessionIdSet explicitlyEndingSessions;
+ LogicalSessionIdMap<LogicalSessionRecord> activeSessions;
+
+ {
+ using std::swap;
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ swap(explicitlyEndingSessions, _endingSessions);
+ swap(activeSessions, _activeSessions);
+ }
+
+ // Create guards that in the case of a exception replace the ending or active sessions that
+ // swapped out of LogicalSessionCache, and merges in any records that had been added since we
+ // swapped them out.
+ auto backSwap = [this](auto& member, auto& temp) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ using std::swap;
+ swap(member, temp);
+ for (const auto& it : temp) {
+ member.emplace(it);
+ }
+ };
+ auto activeSessionsBackSwapper = makeGuard([&] { backSwap(_activeSessions, activeSessions); });
+ auto explicitlyEndingBackSwaper =
+ makeGuard([&] { backSwap(_endingSessions, explicitlyEndingSessions); });
+
+ // remove all explicitlyEndingSessions from activeSessions
+ for (const auto& lsid : explicitlyEndingSessions) {
+ activeSessions.erase(lsid);
+ }
+
+ // Refresh all recently active sessions as well as for sessions attached to running ops
+ LogicalSessionRecordSet activeSessionRecords;
+
+ auto runningOpSessions = _service->getActiveOpSessions();
+
+ for (const auto& it : runningOpSessions) {
+ // if a running op is the cause of an upsert, we won't have a user name for the record
+ if (explicitlyEndingSessions.count(it) > 0) {
+ continue;
+ }
+ activeSessionRecords.insert(makeLogicalSessionRecord(it, _service->now()));
+ }
+ for (const auto& it : activeSessions) {
+ activeSessionRecords.insert(it.second);
+ }
+
+ // Refresh the active sessions in the sessions collection.
+ uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords));
+ activeSessionsBackSwapper.dismiss();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _stats.setLastSessionsCollectionJobEntriesRefreshed(activeSessionRecords.size());
+ }
+
+ // Remove the ending sessions from the sessions collection.
+ uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions));
+ explicitlyEndingBackSwaper.dismiss();
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _stats.setLastSessionsCollectionJobEntriesEnded(explicitlyEndingSessions.size());
+ }
+
+ // Find which running, but not recently active sessions, are expired, and add them
+ // to the list of sessions to kill cursors for
+
+ KillAllSessionsByPatternSet patterns;
+
+ auto openCursorSessions = _service->getOpenCursorSessions(opCtx);
+ // Exclude sessions added to _activeSessions from the openCursorSession to avoid race between
+ // killing cursors on the removed sessions and creating sessions.
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ for (const auto& it : _activeSessions) {
+ auto newSessionIt = openCursorSessions.find(it.first);
+ if (newSessionIt != openCursorSessions.end()) {
+ openCursorSessions.erase(newSessionIt);
+ }
+ }
+ }
+
+ // think about pruning ending and active out of openCursorSessions
+ auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions);
+
+ if (statusAndRemovedSessions.isOK()) {
+ auto removedSessions = statusAndRemovedSessions.getValue();
+ for (const auto& lsid : removedSessions) {
+ patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid));
+ }
+ } else {
+ // Ignore errors.
+ }
+
+ // Add all of the explicitly ended sessions to the list of sessions to kill cursors for.
+ for (const auto& lsid : explicitlyEndingSessions) {
+ patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid));
+ }
+
+ SessionKiller::Matcher matcher(std::move(patterns));
+ auto killRes = _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher));
+ {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _stats.setLastSessionsCollectionJobCursorsClosed(killRes.second);
+ }
+}
+
+void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _endingSessions.insert(begin(sessions), end(sessions));
+}
+
+LogicalSessionCacheStats LogicalSessionCacheImpl::getStats() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _stats.setActiveSessionsCount(_activeSessions.size());
+ return _stats;
+}
+
+Status LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ if (_activeSessions.size() >= static_cast<size_t>(maxSessions)) {
+ return {ErrorCodes::TooManyLogicalSessions, "cannot add session into the cache"};
+ }
+
+ _activeSessions.insert(std::make_pair(record.getId(), record));
+ return Status::OK();
+}
+
+std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ std::vector<LogicalSessionId> ret;
+ ret.reserve(_activeSessions.size());
+ for (const auto& id : _activeSessions) {
+ ret.push_back(id.first);
+ }
+ return ret;
+}
+
+std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
+ const std::vector<SHA256Block>& userDigests) const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ std::vector<LogicalSessionId> ret;
+ for (const auto& it : _activeSessions) {
+ if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) !=
+ userDigests.cend()) {
+ ret.push_back(it.first);
+ }
+ }
+ return ret;
+}
+
+boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached(
+ const LogicalSessionId& id) const {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ const auto it = _activeSessions.find(id);
+ if (it == _activeSessions.end()) {
+ return boost::none;
+ }
+ return it->second;
+}
+
+} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h
index 96c7f80d717..956a7924023 100644
--- a/src/mongo/db/logical_session_cache_impl.h
+++ b/src/mongo/db/logical_session_cache_impl.h
@@ -66,14 +66,12 @@ public:
void joinOnShutDown() override;
- Status promote(LogicalSessionId lsid) override;
+ Status promote(const LogicalSessionId& lsid) override;
- Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override;
+ Status startSession(OperationContext* opCtx, const LogicalSessionRecord& record) override;
Status refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClient& cmd) override;
- Status refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClusterMember& cmd) override;
+ const std::vector<LogicalSessionFromClient>& sessions) override;
Status vivify(OperationContext* opCtx, const LogicalSessionId& lsid) override;
@@ -81,8 +79,6 @@ public:
Status reapNow(Client* client) override;
- Date_t now() override;
-
size_t size() override;
std::vector<LogicalSessionId> listIds() const override;
diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h
index 8b5bb312102..3c7b860c355 100644
--- a/src/mongo/db/logical_session_cache_noop.h
+++ b/src/mongo/db/logical_session_cache_noop.h
@@ -44,20 +44,16 @@ class LogicalSessionCacheNoop : public LogicalSessionCache {
public:
void joinOnShutDown() override {}
- Status promote(LogicalSessionId lsid) override {
+ Status promote(const LogicalSessionId& lsid) override {
return Status::OK();
}
- Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override {
+ Status startSession(OperationContext* opCtx, const LogicalSessionRecord& record) override {
return Status::OK();
}
Status refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClient& cmd) override {
- return Status::OK();
- }
- Status refreshSessions(OperationContext* opCtx,
- const RefreshSessionsCmdFromClusterMember& cmd) override {
+ const std::vector<LogicalSessionFromClient>& sessions) override {
return Status::OK();
}
@@ -73,10 +69,6 @@ public:
return Status::OK();
}
- Date_t now() override {
- return Date_t::now();
- }
-
size_t size() override {
return 0;
}
diff --git a/src/mongo/db/refresh_sessions.idl b/src/mongo/db/refresh_sessions.idl
deleted file mode 100644
index 6bb9de2f7c5..00000000000
--- a/src/mongo/db/refresh_sessions.idl
+++ /dev/null
@@ -1,52 +0,0 @@
-# Copyright (C) 2018-present MongoDB, Inc.
-#
-# This program is free software: you can redistribute it and/or modify
-# it under the terms of the Server Side Public License, version 1,
-# as published by MongoDB, Inc.
-#
-# This program is distributed in the hope that it will be useful,
-# but WITHOUT ANY WARRANTY; without even the implied warranty of
-# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
-# Server Side Public License for more details.
-#
-# You should have received a copy of the Server Side Public License
-# along with this program. If not, see
-# <http://www.mongodb.com/licensing/server-side-public-license>.
-#
-# As a special exception, the copyright holders give permission to link the
-# code of portions of this program with the OpenSSL library under certain
-# conditions as described in each individual source file and distribute
-# linked combinations including the program with the OpenSSL library. You
-# must comply with the Server Side Public License in all respects for
-# all of the code used other than as permitted herein. If you modify file(s)
-# with this exception, you may extend this exception to your version of the
-# file(s), but you are not obligated to do so. If you do not wish to do so,
-# delete this exception statement from your version. If you delete this
-# exception statement from all source files in the program, then also delete
-# it in the license file.
-#
-
-# This IDL file describes the BSON format for a LogicalSessionId, and
-# handles the serialization to and deserialization from its BSON representation
-# for that class.
-
-global:
- cpp_namespace: "mongo"
-
-imports:
- - "mongo/idl/basic_types.idl"
- - "mongo/db/logical_session_id.idl"
-
-structs:
-
- RefreshSessionsCmdFromClient:
- description: "A struct representing a refreshSessions command from a client"
- strict: false
- fields:
- refreshSessions: array<LogicalSessionFromClient>
-
- RefreshSessionsCmdFromClusterMember:
- description: "A struct representing a refreshSessions command from a cluster member"
- strict: false
- fields:
- refreshSessionsInternal: array<LogicalSessionRecord>
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index 6421302ab28..37e1749fefe 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -39,7 +39,6 @@
#include "mongo/db/create_indexes_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/ops/write_ops.h"
-#include "mongo/db/refresh_sessions_gen.h"
#include "mongo/rpc/get_status_from_command_result.h"
#include "mongo/s/write_ops/batched_command_response.h"
#include "mongo/stdx/functional.h"
diff --git a/src/mongo/s/commands/cluster_create_cmd.cpp b/src/mongo/s/commands/cluster_create_cmd.cpp
index a87b2522687..a5c71dcb1e0 100644
--- a/src/mongo/s/commands/cluster_create_cmd.cpp
+++ b/src/mongo/s/commands/cluster_create_cmd.cpp
@@ -92,14 +92,14 @@ public:
configCreateCmd.setOptions(optionsBuilder.obj());
}
- auto response =
- Grid::get(opCtx)->shardRegistry()->getConfigShard()->runCommandWithFixedRetryAttempts(
- opCtx,
- ReadPreferenceSetting{ReadPreference::PrimaryOnly},
- "admin",
- CommandHelpers::appendMajorityWriteConcern(
- CommandHelpers::appendPassthroughFields(cmdObj, configCreateCmd.toBSON({}))),
- Shard::RetryPolicy::kIdempotent);
+ const auto shardRegistry = Grid::get(opCtx)->shardRegistry();
+ auto response = shardRegistry->getConfigShard()->runCommandWithFixedRetryAttempts(
+ opCtx,
+ ReadPreferenceSetting{ReadPreference::PrimaryOnly},
+ "admin",
+ CommandHelpers::appendMajorityWriteConcern(
+ CommandHelpers::appendPassthroughFields(cmdObj, configCreateCmd.toBSON({}))),
+ Shard::RetryPolicy::kIdempotent);
uassertStatusOK(Shard::CommandResponse::getEffectiveStatus(response));
return true;