summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/core/views/views_all_commands.js9
-rw-r--r--jstests/noPassthrough/end_sessions_command.js70
-rw-r--r--jstests/sharding/safe_secondary_reads_drop_recreate.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js1
-rw-r--r--jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js1
-rw-r--r--src/mongo/db/SConscript6
-rw-r--r--src/mongo/db/commands/SConscript2
-rw-r--r--src/mongo/db/commands/end_sessions.idl28
-rw-r--r--src/mongo/db/commands/end_sessions_command.cpp89
-rw-r--r--src/mongo/db/commands/start_session_command.cpp6
-rw-r--r--src/mongo/db/logical_session_cache.h8
-rw-r--r--src/mongo/db/logical_session_cache_impl.cpp216
-rw-r--r--src/mongo/db/logical_session_cache_impl.h24
-rw-r--r--src/mongo/db/logical_session_cache_noop.h6
-rw-r--r--src/mongo/db/logical_session_cache_test.cpp314
-rw-r--r--src/mongo/db/service_liason.h14
-rw-r--r--src/mongo/db/service_liason_mock.cpp36
-rw-r--r--src/mongo/db/service_liason_mock.h28
-rw-r--r--src/mongo/db/service_liason_mongod.cpp36
-rw-r--r--src/mongo/db/service_liason_mongod.h6
-rw-r--r--src/mongo/db/service_liason_mongos.cpp14
-rw-r--r--src/mongo/db/service_liason_mongos.h6
-rw-r--r--src/mongo/db/sessions_collection.cpp20
-rw-r--r--src/mongo/db/sessions_collection.h5
-rw-r--r--src/mongo/db/sessions_collection_mock.cpp12
-rw-r--r--src/mongo/db/sessions_collection_mock.h8
-rw-r--r--src/mongo/db/sessions_collection_rs.cpp5
-rw-r--r--src/mongo/db/sessions_collection_rs.h3
-rw-r--r--src/mongo/db/sessions_collection_sharded.cpp5
-rw-r--r--src/mongo/db/sessions_collection_sharded.h3
-rw-r--r--src/mongo/db/sessions_collection_standalone.cpp4
-rw-r--r--src/mongo/db/sessions_collection_standalone.h3
-rw-r--r--src/mongo/dbtests/logical_sessions_tests.cpp17
33 files changed, 636 insertions, 370 deletions
diff --git a/jstests/core/views/views_all_commands.js b/jstests/core/views/views_all_commands.js
index b56d61fe7a7..6cee9851545 100644
--- a/jstests/core/views/views_all_commands.js
+++ b/jstests/core/views/views_all_commands.js
@@ -204,6 +204,7 @@
expectFailure: true,
},
enableSharding: {skip: "Tested as part of shardCollection"},
+ endSessions: {skip: isUnrelated},
eval: {skip: isUnrelated},
explain: {command: {explain: {count: "view"}}},
features: {skip: isUnrelated},
@@ -374,6 +375,10 @@
{command: {planCacheListQueryShapes: "view"}, expectFailure: true},
planCacheSetFilter: {command: {planCacheSetFilter: "view"}, expectFailure: true},
profile: {skip: isUnrelated},
+ refreshLogicalSessionCacheNow: {skip: isAnInternalCommand},
+ reapLogicalSessionCacheNow: {skip: isAnInternalCommand},
+ refreshSessions: {skip: isUnrelated},
+ refreshSessionsInternal: {skip: isAnInternalCommand},
reIndex: {command: {reIndex: "view"}, expectFailure: true},
removeShard: {skip: isUnrelated},
removeShardFromZone: {skip: isUnrelated},
@@ -482,10 +487,6 @@
},
stageDebug: {skip: isAnInternalCommand},
startSession: {skip: isAnInternalCommand},
- reapLogicalSessionCacheNow: {skip: isAnInternalCommand},
- refreshLogicalSessionCacheNow: {skip: isAnInternalCommand},
- refreshSessions: {skip: isUnrelated},
- refreshSessionsInternal: {skip: isAnInternalCommand},
top: {skip: "tested in views/views_stats.js"},
touch: {
command: {touch: "view", data: true},
diff --git a/jstests/noPassthrough/end_sessions_command.js b/jstests/noPassthrough/end_sessions_command.js
new file mode 100644
index 00000000000..3d6ae307a52
--- /dev/null
+++ b/jstests/noPassthrough/end_sessions_command.js
@@ -0,0 +1,70 @@
+(function() {
+ "use script";
+
+ var res;
+ var refresh = {refreshLogicalSessionCacheNow: 1};
+ var startSession = {startSession: 1};
+
+ // Start up a standalone server.
+ var conn = MongoRunner.runMongod({nojournal: ""});
+ var admin = conn.getDB("admin");
+
+ // Trigger an initial refresh, as a sanity check.
+ res = admin.runCommand(refresh);
+ assert.commandWorked(res, "failed to refresh");
+
+ var sessions = [];
+ for (var i = 0; i < 20; i++) {
+ res = admin.runCommand(startSession);
+ assert.commandWorked(res, "unable to start session");
+ sessions.push(res);
+ }
+
+ res = admin.runCommand(refresh);
+ assert.commandWorked(res, "failed to refresh");
+
+ assert.eq(admin.system.sessions.count(), 20, "refresh should have written 20 session records");
+
+ var endSessionsIds = [];
+ for (var i = 0; i < 10; i++) {
+ endSessionsIds.push(sessions[i].id);
+ }
+ res = admin.runCommand({endSessions: endSessionsIds});
+ assert.commandWorked(res, "failed to end sessions");
+
+ res = admin.runCommand(refresh);
+ assert.commandWorked(res, "failed to refresh");
+
+ assert.eq(admin.system.sessions.count(),
+ 10,
+ "endSessions and refresh should result in 10 remaining sessions");
+
+ // double delete the remaining 10
+ endSessionsIds = [];
+ for (var i = 10; i < 20; i++) {
+ endSessionsIds.push(sessions[i].id);
+ endSessionsIds.push(sessions[i].id);
+ }
+
+ res = admin.runCommand({endSessions: endSessionsIds});
+ assert.commandWorked(res, "failed to end sessions");
+
+ res = admin.runCommand(refresh);
+ assert.commandWorked(res, "failed to refresh");
+
+ assert.eq(admin.system.sessions.count(),
+ 0,
+ "endSessions and refresh should result in 0 remaining sessions");
+
+ // delete some sessions that were never created
+ res = admin.runCommand({
+ endSessions: [
+ {"id": UUID("bacb219c-214c-47f9-a94a-6c7f434b3bae")},
+ {"id": UUID("bacb219c-214c-47f9-a94a-6c7f434b3baf")}
+ ]
+ });
+
+ res = admin.runCommand(refresh);
+ assert.commandWorked(res, "failed to refresh");
+
+}());
diff --git a/jstests/sharding/safe_secondary_reads_drop_recreate.js b/jstests/sharding/safe_secondary_reads_drop_recreate.js
index 5b49fd1dc24..e8a285456f8 100644
--- a/jstests/sharding/safe_secondary_reads_drop_recreate.js
+++ b/jstests/sharding/safe_secondary_reads_drop_recreate.js
@@ -155,6 +155,7 @@
dropUser: {skip: "primary only"},
emptycapped: {skip: "primary only"},
enableSharding: {skip: "primary only"},
+ endSessions: {skip: "does not return user data"},
eval: {skip: "primary only"},
explain: {skip: "TODO SERVER-30068"},
features: {skip: "does not return user data"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
index 44ac466c435..64a7a674cf7 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_suspend_range_deletion.js
@@ -179,6 +179,7 @@
dropUser: {skip: "primary only"},
emptycapped: {skip: "primary only"},
enableSharding: {skip: "primary only"},
+ endSessions: {skip: "does not return user data"},
eval: {skip: "primary only"},
explain: {skip: "TODO SERVER-30068"},
features: {skip: "does not return user data"},
diff --git a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
index bf09aebe9f5..4e728ba4f9a 100644
--- a/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
+++ b/jstests/sharding/safe_secondary_reads_single_migration_waitForDelete.js
@@ -157,6 +157,7 @@
dropUser: {skip: "primary only"},
emptycapped: {skip: "primary only"},
enableSharding: {skip: "primary only"},
+ endSessions: {skip: "does not return user data"},
eval: {skip: "primary only"},
explain: {skip: "TODO SERVER-30068"},
features: {skip: "does not return user data"},
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 70151071778..22074f2f637 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -949,11 +949,12 @@ envWithAsio.Library(
LIBDEPS=[
'$BUILD_DIR/mongo/executor/async_timer_mock',
'$BUILD_DIR/mongo/util/periodic_runner_asio',
+ 'kill_sessions',
'service_liason',
],
)
-env.Library(
+envWithAsio.Library(
target='service_liason_mongod',
source=[
'service_liason_mongod.cpp',
@@ -1072,6 +1073,7 @@ env.Library(
'initialize_operation_session_info.cpp',
'logical_session_cache_impl.cpp',
'logical_session_server_status_section.cpp',
+ env.Idlc('commands/end_sessions.idl')[0],
],
LIBDEPS=[
'$BUILD_DIR/mongo/db/commands/server_status',
@@ -1081,6 +1083,7 @@ env.Library(
'sessions_collection',
'server_parameters',
'service_liason',
+ 'kill_sessions',
],
)
@@ -1109,6 +1112,7 @@ envWithAsio.CppUnitTest(
LIBDEPS=[
'$BUILD_DIR/mongo/db/service_context_noop_init',
'$BUILD_DIR/mongo/executor/async_timer_mock',
+ 'auth/authmocks',
'keys_collection_manager',
'keys_collection_document',
'logical_clock',
diff --git a/src/mongo/db/commands/SConscript b/src/mongo/db/commands/SConscript
index 99cb82a1298..31f8600cd16 100644
--- a/src/mongo/db/commands/SConscript
+++ b/src/mongo/db/commands/SConscript
@@ -57,6 +57,7 @@ env.Library(
"conn_pool_sync.cpp",
"connection_status.cpp",
"copydb_common.cpp",
+ "end_sessions_command.cpp",
"fail_point_cmd.cpp",
"feature_compatibility_version_command_parser.cpp",
"find_and_modify_common.cpp",
@@ -93,6 +94,7 @@ env.Library(
'$BUILD_DIR/mongo/db/lasterror',
'$BUILD_DIR/mongo/db/log_process_details',
'$BUILD_DIR/mongo/db/logical_session_cache',
+ '$BUILD_DIR/mongo/db/logical_session_cache_impl',
'$BUILD_DIR/mongo/db/logical_session_id',
'$BUILD_DIR/mongo/db/logical_session_id_helpers',
'$BUILD_DIR/mongo/db/matcher/expressions',
diff --git a/src/mongo/db/commands/end_sessions.idl b/src/mongo/db/commands/end_sessions.idl
new file mode 100644
index 00000000000..877eddd72f4
--- /dev/null
+++ b/src/mongo/db/commands/end_sessions.idl
@@ -0,0 +1,28 @@
+# Copyright (C) 2017 MongoDB Inc.
+#
+# This program is free software: you can redistribute it and/or modify
+# it under the terms of the GNU Affero General Public License, version 3,
+# as published by the Free Software Foundation.
+#
+# This program is distributed in the hope that it will be useful,
+# but WITHOUT ANY WARRANTY; without even the implied warranty of
+# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+# GNU Affero General Public License for more details.
+#
+# You should have received a copy of the GNU Affero General Public License
+# along with this program. If not, see <http://www.gnu.org/licenses/>.
+#
+
+global:
+ cpp_namespace: "mongo"
+
+imports:
+ - "mongo/db/logical_session_id.idl"
+
+structs:
+
+ EndSessionsCmdFromClient:
+ description: "A struct representing an endSessions command from a client"
+ strict: false
+ fields:
+ endSessions: array<LogicalSessionFromClient>
diff --git a/src/mongo/db/commands/end_sessions_command.cpp b/src/mongo/db/commands/end_sessions_command.cpp
new file mode 100644
index 00000000000..6a07059f472
--- /dev/null
+++ b/src/mongo/db/commands/end_sessions_command.cpp
@@ -0,0 +1,89 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/base/init.h"
+#include "mongo/db/auth/authorization_session.h"
+#include "mongo/db/client.h"
+#include "mongo/db/commands.h"
+#include "mongo/db/logical_session_cache.h"
+#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/operation_context.h"
+
+namespace mongo {
+
+class EndSessionsCommand final : public BasicCommand {
+ MONGO_DISALLOW_COPYING(EndSessionsCommand);
+
+public:
+ EndSessionsCommand() : BasicCommand("endSessions") {}
+
+ bool slaveOk() const override {
+ return true;
+ }
+ bool adminOnly() const override {
+ return false;
+ }
+ bool supportsWriteConcern(const BSONObj& cmd) const override {
+ return false;
+ }
+ void help(std::stringstream& help) const override {
+ help << "end a set of logical sessions";
+ }
+ Status checkAuthForOperation(OperationContext* opCtx,
+ const std::string& dbname,
+ const BSONObj& cmdObj) override {
+
+ // It is always ok to run this command, as long as you are authenticated
+ // as some user, if auth is enabled.
+ AuthorizationSession* authSession = AuthorizationSession::get(opCtx->getClient());
+ try {
+ auto user = authSession->getSingleUser();
+ invariant(user);
+ return Status::OK();
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ }
+
+ virtual bool run(OperationContext* opCtx,
+ const std::string& db,
+ const BSONObj& cmdObj,
+ BSONObjBuilder& result) override {
+
+ auto lsCache = LogicalSessionCache::get(opCtx);
+
+ auto cmd = EndSessionsCmdFromClient::parse("EndSessionsCmdFromClient"_sd, cmdObj);
+
+ lsCache->endSessions(makeLogicalSessionIds(cmd.getEndSessions(), opCtx));
+ return true;
+ }
+} endSessionsCommand;
+
+} // namespace mongo
diff --git a/src/mongo/db/commands/start_session_command.cpp b/src/mongo/db/commands/start_session_command.cpp
index a8ae4219a34..63fe696057d 100644
--- a/src/mongo/db/commands/start_session_command.cpp
+++ b/src/mongo/db/commands/start_session_command.cpp
@@ -90,11 +90,7 @@ public:
return appendCommandStatus(result, status);
}
- Status startSessionStatus = lsCache->startSession(opCtx, record.get());
-
- if (!startSessionStatus.isOK()) {
- return appendCommandStatus(result, startSessionStatus);
- }
+ lsCache->startSession(opCtx, record.get());
makeLogicalSessionToClient(record->getId()).serialize(&result);
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index 3a1ce97ef39..f9ff378fc72 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -31,6 +31,7 @@
#include <boost/optional.hpp>
#include "mongo/base/status.h"
+#include "mongo/db/commands/end_sessions_gen.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/refresh_sessions_gen.h"
@@ -67,7 +68,7 @@ public:
* should only be used when starting new sessions and should not be used to
* insert records for existing sessions.
*/
- virtual Status startSession(OperationContext* opCtx, LogicalSessionRecord record) = 0;
+ virtual void startSession(OperationContext* opCtx, LogicalSessionRecord record) = 0;
/**
* Refresh the given sessions. Updates the timestamps of these records in
@@ -85,6 +86,11 @@ public:
virtual void vivify(OperationContext* opCtx, const LogicalSessionId& lsid) = 0;
/**
+ * enqueues LogicalSessionIds for removal during the next _refresh()
+ */
+ virtual void endSessions(const LogicalSessionIdSet& lsids) = 0;
+
+ /**
* Removes all local records in this cache. Does not remove the corresponding
* authoritative session records from the sessions collection.
*/
diff --git a/src/mongo/db/logical_session_cache_impl.cpp b/src/mongo/db/logical_session_cache_impl.cpp
index ffafcccbd4f..624934c2785 100644
--- a/src/mongo/db/logical_session_cache_impl.cpp
+++ b/src/mongo/db/logical_session_cache_impl.cpp
@@ -41,13 +41,10 @@
#include "mongo/util/duration.h"
#include "mongo/util/log.h"
#include "mongo/util/periodic_runner.h"
+#include "mongo/util/scopeguard.h"
namespace mongo {
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRecordCacheSize,
- int,
- LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity);
-
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(
logicalSessionRefreshMinutes,
int,
@@ -55,7 +52,6 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(disableLogicalSessionCacheRefresh, bool, false);
-constexpr int LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity;
constexpr Minutes LogicalSessionCacheImpl::kLogicalSessionDefaultRefresh;
LogicalSessionCacheImpl::LogicalSessionCacheImpl(
@@ -67,8 +63,7 @@ LogicalSessionCacheImpl::LogicalSessionCacheImpl(
_sessionTimeout(options.sessionTimeout),
_service(std::move(service)),
_sessionsColl(std::move(collection)),
- _transactionReaper(std::move(transactionReaper)),
- _cache(options.capacity) {
+ _transactionReaper(std::move(transactionReaper)) {
if (!disableLogicalSessionCacheRefresh) {
_service->scheduleJob(
{[this](Client* client) { _periodicRefresh(client); }, _refreshInterval});
@@ -88,30 +83,27 @@ LogicalSessionCacheImpl::~LogicalSessionCacheImpl() {
}
Status LogicalSessionCacheImpl::promote(LogicalSessionId lsid) {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- auto it = _cache.find(lsid);
- if (it == _cache.end()) {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ auto it = _activeSessions.find(lsid);
+ if (it == _activeSessions.end()) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
}
- // Update the last use time before returning.
- it->second.setLastUse(now());
return Status::OK();
}
-Status LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) {
+void LogicalSessionCacheImpl::startSession(OperationContext* opCtx, LogicalSessionRecord record) {
// Add the new record to our local cache. We will insert it into the sessions collection
// the next time _refresh is called. If there is already a record in the cache for this
// session, we'll just write over it with our newer, more recent one.
_addToCache(record);
- return Status::OK();
}
Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx,
const RefreshSessionsCmdFromClient& cmd) {
// Update the timestamps of all these records in our cache.
auto sessions = makeLogicalSessionIds(cmd.getRefreshSessions(), opCtx);
- for (auto& lsid : sessions) {
+ for (const auto& lsid : sessions) {
if (!promote(lsid).isOK()) {
// This is a new record, insert it.
_addToCache(makeLogicalSessionRecord(opCtx, lsid, now()));
@@ -127,7 +119,7 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx,
// Update the timestamps of all these records in our cache.
auto records = cmd.getRefreshSessionsInternal();
- for (auto& record : records) {
+ for (const auto& record : records) {
if (!promote(record.getId()).isOK()) {
// This is a new record, insert it.
_addToCache(record);
@@ -136,17 +128,22 @@ Status LogicalSessionCacheImpl::refreshSessions(OperationContext* opCtx,
}
// Write to the sessions collection now.
- return _sessionsColl->refreshSessions(opCtx, toRefresh, now());
+ return _sessionsColl->refreshSessions(opCtx, toRefresh);
}
void LogicalSessionCacheImpl::vivify(OperationContext* opCtx, const LogicalSessionId& lsid) {
if (!promote(lsid).isOK()) {
- startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now())).ignore();
+ startSession(opCtx, makeLogicalSessionRecord(opCtx, lsid, now()));
}
}
Status LogicalSessionCacheImpl::refreshNow(Client* client) {
- return _refresh(client);
+ try {
+ _refresh(client);
+ } catch (...) {
+ return exceptionToStatus();
+ }
+ return Status::OK();
}
Status LogicalSessionCacheImpl::reapNow(Client* client) {
@@ -159,16 +156,15 @@ Date_t LogicalSessionCacheImpl::now() {
size_t LogicalSessionCacheImpl::size() {
stdx::lock_guard<stdx::mutex> lock(_cacheMutex);
- return _cache.size();
+ return _activeSessions.size();
}
void LogicalSessionCacheImpl::_periodicRefresh(Client* client) {
- auto res = _refresh(client);
- if (!res.isOK()) {
- log() << "Failed to refresh session cache: " << res;
+ try {
+ _refresh(client);
+ } catch (...) {
+ log() << "Failed to refresh session cache: " << exceptionToStatus();
}
-
- return;
}
void LogicalSessionCacheImpl::_periodicReap(Client* client) {
@@ -204,115 +200,123 @@ Status LogicalSessionCacheImpl::_reap(Client* client) {
return Status::OK();
}
-Status LogicalSessionCacheImpl::_refresh(Client* client) {
- LogicalSessionRecordSet activeSessions;
- LogicalSessionRecordSet deadSessions;
-
- auto time = now();
-
- // We should avoid situations where we have records in the cache
- // that have been expired from the sessions collection. If they haven't been
- // used in _sessionTimeout, we should just remove them.
+void LogicalSessionCacheImpl::_refresh(Client* client) {
+ LogicalSessionIdSet staleSessions;
+ LogicalSessionIdSet explicitlyEndingSessions;
+ LogicalSessionIdMap<LogicalSessionRecord> activeSessions;
+
+ // backSwapper creates a guard that in the case of a exception
+ // replaces the ending or active sessions that swapped out of of LogicalSessionCache,
+ // and merges in any records that had been added since we swapped them
+ // out.
+ auto backSwapper = [this](auto& member, auto& temp) {
+ return MakeGuard([this, &member, &temp] {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ using std::swap;
+ swap(member, temp);
+ for (const auto& it : temp) {
+ member.emplace(it);
+ }
+ });
+ };
- // Assemble a list of active session records in our cache
- std::vector<decltype(_cache)::ListEntry> cacheCopy;
{
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- cacheCopy.assign(_cache.begin(), _cache.end());
+ using std::swap;
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ swap(explicitlyEndingSessions, _endingSessions);
+ swap(activeSessions, _activeSessions);
}
+ auto activeSessionsBackSwapper = backSwapper(_activeSessions, activeSessions);
+ auto explicitlyEndingBackSwaper = backSwapper(_endingSessions, explicitlyEndingSessions);
- for (auto& it : cacheCopy) {
- auto record = it.second;
- if (!_isDead(record, time)) {
- activeSessions.insert(record);
- } else {
- deadSessions.insert(record);
+ // get or make an opCtx
+
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto* const opCtx = [&client, &uniqueCtx] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
}
+
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
+
+ // remove all explicitlyEndingSessions from activeSessions
+ for (const auto& lsid : explicitlyEndingSessions) {
+ activeSessions.erase(lsid);
}
- // Append any active sessions from the service. We should always have
- // cache entries for active sessions. If we don't, then it is a sign that
- // the cache needs to be larger, because active session records are being
- // evicted.
-
- // Promote our cached entries for all active service sessions to be recently-
- // used, and update their lastUse dates so we don't lose them to eviction. We
- // do not need to do this with records from our own cache, which are being used
- // regularly. Sessions for long-running queries, however, must be kept alive
- // by us here.
- auto serviceSessions = _service->getActiveSessions();
- {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- for (auto lsid : serviceSessions) {
- auto it = _cache.promote(lsid);
- if (it != _cache.end()) {
- // If we have not found our record, it may have been removed
- // by another thread.
- it->second.setLastUse(time);
- activeSessions.insert(it->second);
- }
+ // refresh all recently active sessions as well as for sessions attached to running ops
- // TODO SERVER-29709: Rethink how active sessions interact with refreshes,
- // and potentially move this block above the block where we separate
- // dead sessions from live sessions, above.
- activeSessions.insert(makeLogicalSessionRecord(lsid, time));
+ LogicalSessionRecordSet activeSessionRecords{};
+
+ auto runningOpSessions = _service->getActiveOpSessions();
+
+ for (const auto& it : runningOpSessions) {
+ // if a running op is the cause of an upsert, we won't have a user name for the record
+ if (explicitlyEndingSessions.count(it) > 0) {
+ continue;
}
+ LogicalSessionRecord lsr;
+ lsr.setId(it);
+ activeSessionRecords.insert(lsr);
+ }
+ for (const auto& it : activeSessions) {
+ activeSessionRecords.insert(it.second);
}
+ // refresh the active sessions in the sessions collection
+ uassertStatusOK(_sessionsColl->refreshSessions(opCtx, activeSessionRecords));
+ activeSessionsBackSwapper.Dismiss();
- // Query into the sessions collection to do the refresh. If any sessions have
- // failed to refresh, it means their authoritative records were removed, and
- // we should remove such records from our cache as well.
- {
- boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
- auto* const opCtx = [&client, &uniqueCtx] {
- if (client->getOperationContext()) {
- return client->getOperationContext();
- }
+ // remove the ending sessions from the sessions collection
+ uassertStatusOK(_sessionsColl->removeRecords(opCtx, explicitlyEndingSessions));
+ explicitlyEndingBackSwaper.Dismiss();
- uniqueCtx.emplace(client->makeOperationContext());
- return uniqueCtx->get();
- }();
+ // Find which running, but not recently active sessions, are expired, and add them
+ // to the list of sessions to kill cursors for
+
+ KillAllSessionsByPatternSet patterns;
+
+ auto openCursorSessions = _service->getOpenCursorSessions();
+ // think about pruning ending and active out of openCursorSessions
+ auto statusAndRemovedSessions = _sessionsColl->findRemovedSessions(opCtx, openCursorSessions);
- auto res = _sessionsColl->refreshSessions(opCtx, std::move(activeSessions), time);
- if (!res.isOK()) {
- // TODO SERVER-29709: handle network errors here.
- return res;
+ if (statusAndRemovedSessions.isOK()) {
+ auto removedSessions = statusAndRemovedSessions.getValue();
+ for (const auto& lsid : removedSessions) {
+ patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid));
}
}
- // Prune any dead records out of the cache. Dead records are ones that failed to
- // refresh, or ones that have expired locally. We don't make an effort to check
- // if the locally-expired records still have live authoritative records in the
- // sessions collection. We also don't attempt to resurrect our expired records.
- // However, we *do* keep records alive if they are active on the service.
- {
- // TODO SERVER-29709: handle expiration separately from failure to refresh.
- }
+ // Add all of the explicitly ended sessions to the list of sessions to kill cursors for
- return Status::OK();
+ for (const auto& lsid : explicitlyEndingSessions) {
+ patterns.emplace(makeKillAllSessionsByPattern(opCtx, lsid));
+ }
+ SessionKiller::Matcher matcher(std::move(patterns));
+ _service->killCursorsWithMatchingSessions(opCtx, std::move(matcher)).ignore();
}
-
void LogicalSessionCacheImpl::clear() {
// TODO: What should this do? Wasn't implemented before
MONGO_UNREACHABLE;
}
-bool LogicalSessionCacheImpl::_isDead(const LogicalSessionRecord& record, Date_t now) const {
- return record.getLastUse() + _sessionTimeout < now;
+void LogicalSessionCacheImpl::endSessions(const LogicalSessionIdSet& sessions) {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ _endingSessions.insert(begin(sessions), end(sessions));
}
-boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::_addToCache(
- LogicalSessionRecord record) {
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- return _cache.add(record.getId(), std::move(record));
+void LogicalSessionCacheImpl::_addToCache(LogicalSessionRecord record) {
+ stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
+ _activeSessions.insert(std::make_pair(record.getId(), record));
}
std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds() const {
stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
std::vector<LogicalSessionId> ret;
- ret.reserve(_cache.size());
- for (const auto& id : _cache) {
+ ret.reserve(_activeSessions.size());
+ for (const auto& id : _activeSessions) {
ret.push_back(id.first);
}
return ret;
@@ -322,7 +326,7 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
const std::vector<SHA256Block>& userDigests) const {
stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
std::vector<LogicalSessionId> ret;
- for (const auto& it : _cache) {
+ for (const auto& it : _activeSessions) {
if (std::find(userDigests.cbegin(), userDigests.cend(), it.first.getUid()) !=
userDigests.cend()) {
ret.push_back(it.first);
@@ -334,8 +338,8 @@ std::vector<LogicalSessionId> LogicalSessionCacheImpl::listIds(
boost::optional<LogicalSessionRecord> LogicalSessionCacheImpl::peekCached(
const LogicalSessionId& id) const {
stdx::lock_guard<stdx::mutex> lk(_cacheMutex);
- const auto it = _cache.cfind(id);
- if (it == _cache.cend()) {
+ const auto it = _activeSessions.find(id);
+ if (it == _activeSessions.end()) {
return boost::none;
}
return it->second;
diff --git a/src/mongo/db/logical_session_cache_impl.h b/src/mongo/db/logical_session_cache_impl.h
index 688e04a8a37..3c8557166d5 100644
--- a/src/mongo/db/logical_session_cache_impl.h
+++ b/src/mongo/db/logical_session_cache_impl.h
@@ -45,7 +45,6 @@ class Client;
class OperationContext;
class ServiceContext;
-extern int logicalSessionRecordCacheSize;
extern int logicalSessionRefreshMinutes;
/**
@@ -56,7 +55,6 @@ extern int logicalSessionRefreshMinutes;
*/
class LogicalSessionCacheImpl final : public LogicalSessionCache {
public:
- static constexpr int kLogicalSessionCacheDefaultCapacity = 10000;
static constexpr Minutes kLogicalSessionDefaultRefresh = Minutes(5);
/**
@@ -66,13 +64,6 @@ public:
Options(){};
/**
- * The number of session records to keep in the cache.
- *
- * May be set with --setParameter logicalSessionRecordCacheSize=X.
- */
- int capacity = logicalSessionRecordCacheSize;
-
- /**
* A timeout value to use for sessions in the cache, in minutes.
*
* By default, this is set to 30 minutes.
@@ -108,7 +99,7 @@ public:
Status promote(LogicalSessionId lsid) override;
- Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override;
+ void startSession(OperationContext* opCtx, LogicalSessionRecord record) override;
Status refreshSessions(OperationContext* opCtx,
const RefreshSessionsCmdFromClient& cmd) override;
@@ -134,13 +125,15 @@ public:
boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const override;
+ void endSessions(const LogicalSessionIdSet& sessions) override;
+
private:
/**
* Internal methods to handle scheduling and perform refreshes for active
* session records contained within the cache.
*/
void _periodicRefresh(Client* client);
- Status _refresh(Client* client);
+ void _refresh(Client* client);
void _periodicReap(Client* client);
Status _reap(Client* client);
@@ -153,7 +146,7 @@ private:
/**
* Takes the lock and inserts the given record into the cache.
*/
- boost::optional<LogicalSessionRecord> _addToCache(LogicalSessionRecord record);
+ void _addToCache(LogicalSessionRecord record);
const Minutes _refreshInterval;
const Minutes _sessionTimeout;
@@ -165,7 +158,12 @@ private:
std::shared_ptr<TransactionReaper> _transactionReaper;
mutable stdx::mutex _cacheMutex;
- LRUCache<LogicalSessionId, LogicalSessionRecord, LogicalSessionIdHash> _cache;
+
+ LogicalSessionIdMap<LogicalSessionRecord> _activeSessions;
+
+ LogicalSessionIdSet _endingSessions;
+
+ Date_t lastRefreshTime;
};
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_noop.h b/src/mongo/db/logical_session_cache_noop.h
index 077efa741b9..ad041d74ca4 100644
--- a/src/mongo/db/logical_session_cache_noop.h
+++ b/src/mongo/db/logical_session_cache_noop.h
@@ -45,9 +45,7 @@ public:
return Status::OK();
}
- Status startSession(OperationContext* opCtx, LogicalSessionRecord record) override {
- return Status::OK();
- }
+ void startSession(OperationContext* opCtx, LogicalSessionRecord record) override {}
Status refreshSessions(OperationContext* opCtx,
const RefreshSessionsCmdFromClient& cmd) override {
@@ -90,6 +88,8 @@ public:
boost::optional<LogicalSessionRecord> peekCached(const LogicalSessionId& id) const override {
return boost::none;
}
+
+ void endSessions(const LogicalSessionIdSet& lsids) override {}
};
} // namespace mongo
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp
index cb8a226dea7..9001ab59a9a 100644
--- a/src/mongo/db/logical_session_cache_test.cpp
+++ b/src/mongo/db/logical_session_cache_test.cpp
@@ -28,6 +28,11 @@
#include "mongo/platform/basic.h"
+#include "mongo/db/auth/authorization_manager.h"
+#include "mongo/db/auth/authorization_session_for_test.h"
+#include "mongo/db/auth/authz_manager_external_state_mock.h"
+#include "mongo/db/auth/authz_session_external_state_mock.h"
+
#include "mongo/bson/oid.h"
#include "mongo/db/auth/user_name.h"
#include "mongo/db/logical_session_cache.h"
@@ -62,6 +67,12 @@ public:
_sessions(std::make_shared<MockSessionsCollectionImpl>()) {}
void setUp() override {
+ auto localManagerState = stdx::make_unique<AuthzManagerExternalStateMock>();
+ localManagerState.get()->setAuthzVersion(AuthorizationManager::schemaVersion28SCRAM);
+ auto uniqueAuthzManager =
+ stdx::make_unique<AuthorizationManager>(std::move(localManagerState));
+ AuthorizationManager::set(&serviceContext, std::move(uniqueAuthzManager));
+
auto client = serviceContext.makeClient("testClient");
_opCtx = client->makeOperationContext();
_client = client.get();
@@ -149,7 +160,7 @@ TEST_F(LogicalSessionCacheTest, PromoteUpdatesLastUse) {
auto start = service()->now();
// Insert the record into the sessions collection with 'start'
- ASSERT(cache()->startSession(opCtx(), makeLogicalSessionRecord(lsid, start)).isOK());
+ cache()->startSession(opCtx(), makeLogicalSessionRecord(lsid, start));
// Fast forward time and promote
service()->fastForward(Milliseconds(500));
@@ -189,8 +200,7 @@ TEST_F(LogicalSessionCacheTest, StartSession) {
auto lsid = record.getId();
// Test starting a new session
- auto res = cache()->startSession(opCtx(), record);
- ASSERT(res.isOK());
+ cache()->startSession(opCtx(), record);
// Record will not be in the collection yet; refresh must happen first.
ASSERT(!sessions()->has(lsid));
@@ -201,81 +211,27 @@ TEST_F(LogicalSessionCacheTest, StartSession) {
ASSERT(sessions()->has(lsid));
// Try to start the same session again, should succeed.
- res = cache()->startSession(opCtx(), record);
- ASSERT(res.isOK());
+ cache()->startSession(opCtx(), record);
// Try to start a session that is already in the sessions collection but
// is not in our local cache, should succeed.
auto record2 = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now());
sessions()->add(record2);
- res = cache()->startSession(opCtx(), record2);
- ASSERT(res.isOK());
+ cache()->startSession(opCtx(), record2);
// Try to start a session that has expired from our cache, and is no
// longer in the sessions collection, should succeed
service()->fastForward(Milliseconds(kSessionTimeout.count() + 5));
sessions()->remove(lsid);
ASSERT(!sessions()->has(lsid));
- res = cache()->startSession(opCtx(), record);
- ASSERT(res.isOK());
-}
-
-// Test that records in the cache are properly refreshed until they expire
-TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) {
- // Insert two records into the cache
- auto record1 = makeLogicalSessionRecordForTest();
- auto record2 = makeLogicalSessionRecordForTest();
- cache()->startSession(opCtx(), record1).transitional_ignore();
- cache()->startSession(opCtx(), record2).transitional_ignore();
-
- stdx::promise<int> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
-
- // Advance time to first refresh point, check that refresh happens, and
- // that it includes both our records
- sessions()->setRefreshHook([&hitRefresh](const LogicalSessionRecordSet& sessions) {
- hitRefresh.set_value(sessions.size());
- return Status::OK();
- });
-
- // Wait for the refresh to happen
- clearOpCtx();
- service()->fastForward(kForceRefresh);
- ASSERT(cache()->refreshNow(client()).isOK());
- refreshFuture.wait();
- ASSERT_EQ(refreshFuture.get(), 2);
-
- sessions()->clearHooks();
-
- stdx::promise<LogicalSessionId> refresh2;
- auto refresh2Future = refresh2.get_future();
-
- // Use one of the records
- setOpCtx();
- auto res = cache()->promote(record1.getId());
- ASSERT(res.isOK());
-
- // Advance time so that one record expires
- // Ensure that first record was refreshed, and second was thrown away
- sessions()->setRefreshHook([&refresh2](const LogicalSessionRecordSet& sessions) {
- // We should only have one record here, the other should have expired
- ASSERT_EQ(sessions.size(), size_t(1));
- refresh2.set_value(sessions.begin()->getId());
- return Status::OK();
- });
-
- clearOpCtx();
- service()->fastForward(kSessionTimeout - kForceRefresh + Milliseconds(1));
- ASSERT(cache()->refreshNow(client()).isOK());
- refresh2Future.wait();
- ASSERT_EQ(refresh2Future.get(), record1.getId());
+ cache()->startSession(opCtx(), record);
}
// Test that session cache properly expires lsids after 30 minutes of no use
TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) {
// Insert a lsid
auto record = makeLogicalSessionRecordForTest();
- cache()->startSession(opCtx(), record).transitional_ignore();
+ cache()->startSession(opCtx(), record);
auto res = cache()->promote(record.getId());
ASSERT(res.isOK());
@@ -289,66 +245,12 @@ TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) {
// ASSERT(!res.isOK());
}
-// Test that we keep refreshing sessions that are active on the service
-TEST_F(LogicalSessionCacheTest, LongRunningQueriesAreRefreshed) {
- auto lsid = makeLogicalSessionIdForTest();
-
- // Insert one active lsid on the service, none in the cache
- service()->add(lsid);
-
- int count = 0;
-
- sessions()->setRefreshHook([&count, &lsid](const LogicalSessionRecordSet& sessions) {
- ASSERT_EQ(sessions.size(), size_t(1));
- ASSERT_EQ(sessions.begin()->getId(), lsid);
- count++;
- return Status::OK();
- });
-
- clearOpCtx();
-
- // Force a refresh, it should refresh our active session
- service()->fastForward(kForceRefresh);
- ASSERT(cache()->refreshNow(client()).isOK());
- ASSERT_EQ(count, 1);
-
- // Force a session timeout, session is still on the service
- service()->fastForward(kSessionTimeout);
- ASSERT(cache()->refreshNow(client()).isOK());
- ASSERT_EQ(count, 2);
-
- // Force another refresh, check that it refreshes that active lsid again
- service()->fastForward(kForceRefresh);
- ASSERT(cache()->refreshNow(client()).isOK());
- ASSERT_EQ(count, 3);
-}
-
-// Test that the set of lsids we refresh is a sum of cached + active lsids
-TEST_F(LogicalSessionCacheTest, RefreshCachedAndServiceSignedLsidsTogether) {
- // Put one session into the cache, one into the service
- auto lsid1 = makeLogicalSessionIdForTest();
- service()->add(lsid1);
- auto record2 = makeLogicalSessionRecordForTest();
- cache()->startSession(opCtx(), record2).transitional_ignore();
-
- // Both signedLsids refresh
- sessions()->setRefreshHook([](const LogicalSessionRecordSet& sessions) {
- ASSERT_EQ(sessions.size(), size_t(2));
- return Status::OK();
- });
-
- // Force a refresh
- clearOpCtx();
- service()->fastForward(kForceRefresh);
- ASSERT(cache()->refreshNow(client()).isOK());
-}
-
// Test large sets of cache-only session lsids
TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) {
- int count = LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity;
+ int count = 10000;
for (int i = 0; i < count; i++) {
auto record = makeLogicalSessionRecordForTest();
- cache()->startSession(opCtx(), record).transitional_ignore();
+ cache()->startSession(opCtx(), record);
}
// Check that all signedLsids refresh
@@ -363,72 +265,140 @@ TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) {
ASSERT(cache()->refreshNow(client()).isOK());
}
-// Test larger sets of service-only session lsids
-TEST_F(LogicalSessionCacheTest, ManyLongRunningSessionsRefresh) {
- int count = LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity;
- for (int i = 0; i < count; i++) {
- auto lsid = makeLogicalSessionIdForTest();
- service()->add(lsid);
- }
-
- // Check that all signedLsids refresh
- sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) {
- ASSERT_EQ(sessions.size(), size_t(count));
- return Status::OK();
- });
-
- // Force a refresh
- clearOpCtx();
- service()->fastForward(kForceRefresh);
- ASSERT(cache()->refreshNow(client()).isOK());
-}
+//
+TEST_F(LogicalSessionCacheTest, RefreshMatrixSessionState) {
+ const std::vector<std::vector<std::string>> stateNames = {
+ {"active", "inactive"},
+ {"running", "not running"},
+ {"expired", "unexpired"},
+ {"ended", "not ended"},
+ {"cursor", "no cursor"},
+ };
+ struct {
+ // results that we test for after the _refresh
+ bool inCollection;
+ bool killed;
+ } testCases[] = {
+ // 0, active, running, expired, ended, cursor
+ {false, true},
+ // 1, inactive, running, expired, ended, cursor
+ {false, true},
+ // 2, active, not running, expired, ended, cursor
+ {false, true},
+ // 3, inactive, not running, expired, ended, cursor
+ {false, true},
+ // 4, active, running, unexpired, ended, cursor
+ {false, true},
+ // 5, inactive, running, unexpired, ended, cursor
+ {false, true},
+ // 6, active, not running, unexpired, ended, cursor
+ {false, true},
+ // 7, inactive, not running, unexpired, ended, cursor
+ {false, true},
+ // 8, active, running, expired, not ended, cursor
+ {true, false},
+ // 9, inactive, running, expired, not ended, cursor
+ {false, true},
+ // 10, active, not running, expired, not ended, cursor
+ {true, false},
+ // 11, inactive, not running, expired, not ended, cursor
+ {false, true},
+ // 12, active, running, unexpired, not ended, cursor
+ {true, false},
+ // 13, inactive, running, unexpired, not ended, cursor
+ {true, false},
+ // 14, active, not running, unexpired, not ended, cursor
+ {true, false},
+ // 15, inactive, not running, unexpired, not ended, cursor
+ {true, false},
+ // 16, active, running, expired, ended, no cursor
+ {false, true},
+ // 17, inactive, running, expired, ended, no cursor
+ {false, true},
+ // 18, active, not running, expired, ended, no cursor
+ {false, true},
+ // 19, inactive, not running, expired, ended, no cursor
+ {false, true},
+ // 20, active, running, unexpired, ended, no cursor
+ {false, true},
+ // 21, inactive, running, unexpired, ended, no cursor
+ {false, true},
+ // 22, active, not running, unexpired, ended, no cursor
+ {false, true},
+ // 23, inactive, not running, unexpired, ended, no cursor
+ {false, true},
+ // 24, active, running, expired, not ended, no cursor
+ {true, false},
+ // 25, inactive, running, expired, not ended, no cursor
+ {false, true},
+ // 26, active, not running, expired, not ended, no cursor
+ {true, false},
+ // 27, inactive, not running, expired, not ended, no cursor
+ {false, false},
+ // 28, active, running, unexpired, not ended, no cursor
+ {true, false},
+ // 29, inactive, running, unexpired, not ended, no cursor
+ {true, false},
+ // 30, active, not running, unexpired, not ended, no cursor
+ {true, false},
+ // 31, inactive, not running, unexpired, not ended, no cursor
+ {true, false},
+ };
+
+ std::vector<LogicalSessionId> ids;
+ for (int i = 0; i < 32; i++) {
+
+ bool active = !(i & 1);
+ bool running = !(i & 2);
+ bool expired = !(i & 4);
+ bool ended = !(i & 8);
+ bool cursor = !(i & 16);
-// Test larger mixed sets of cache/service active sessions
-TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) {
- int count = LogicalSessionCacheImpl::kLogicalSessionCacheDefaultCapacity;
- for (int i = 0; i < count; i++) {
auto lsid = makeLogicalSessionIdForTest();
- service()->add(lsid);
+ ids.push_back(lsid);
+ auto lsRecord = makeLogicalSessionRecord(lsid, service()->now() + Milliseconds(500));
- auto record2 = makeLogicalSessionRecordForTest();
- cache()->startSession(opCtx(), record2).transitional_ignore();
+ if (running) {
+ service()->add(lsid);
+ }
+ if (active) {
+ cache()->startSession(opCtx(), lsRecord);
+ }
+ if (!expired) {
+ sessions()->add(lsRecord);
+ }
+ if (ended) {
+ LogicalSessionIdSet lsidSet;
+ lsidSet.emplace(lsid);
+ cache()->endSessions(lsidSet);
+ }
+ if (cursor) {
+ service()->addCursorSession(lsid);
+ }
}
- int nRefreshed = 0;
-
- // Check that all lsids refresh successfully
- sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) {
- nRefreshed = sessions.size();
- return Status::OK();
- });
-
// Force a refresh
clearOpCtx();
service()->fastForward(kForceRefresh);
ASSERT(cache()->refreshNow(client()).isOK());
- ASSERT_EQ(nRefreshed, count * 2);
-
- // Remove all of the service sessions, should just refresh the cache entries
- service()->clear();
- sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) {
- nRefreshed = sessions.size();
- return Status::OK();
- });
-
- // Force another refresh
- service()->fastForward(kForceRefresh);
- ASSERT(cache()->refreshNow(client()).isOK());
-
- // We should not have refreshed any sessions from the service, only the cache
- ASSERT_EQ(nRefreshed, count);
- // Force a third refresh
- service()->fastForward(kForceRefresh);
- ASSERT(cache()->refreshNow(client()).isOK());
-
- // Again, we should have only refreshed sessions from the cache
- ASSERT_EQ(nRefreshed, count);
+ for (int i = 0; i < 32; i++) {
+ std::stringstream failText;
+ failText << "case " << i << " : ";
+ for (int j = 0; j < 4; j++) {
+ failText << stateNames[j][i >> j & 1] << " ";
+ }
+ failText << " session case failed: ";
+
+ ASSERT(sessions()->has(ids[i]) == testCases[i].inCollection)
+ << failText.str() << (testCases[i].inCollection ? "session wasn't in collection"
+ : "session was in collection");
+ ASSERT((service()->matchKilled(ids[i]) != nullptr) == testCases[i].killed)
+ << failText.str()
+ << (testCases[i].killed ? "session wasn't killed" : "session was killed");
+ }
}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/db/service_liason.h b/src/mongo/db/service_liason.h
index ed7bcf0df86..b51076fefe9 100644
--- a/src/mongo/db/service_liason.h
+++ b/src/mongo/db/service_liason.h
@@ -29,6 +29,7 @@
#pragma once
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/session_killer.h"
#include "mongo/stdx/functional.h"
#include "mongo/util/periodic_runner.h"
#include "mongo/util/time_support.h"
@@ -52,7 +53,12 @@ public:
* Return a list of sessions that are currently being used to run operations
* on this service.
*/
- virtual LogicalSessionIdSet getActiveSessions() const = 0;
+ virtual LogicalSessionIdSet getActiveOpSessions() const = 0;
+
+ /**
+ * Return a list of sessions that are currently attached to open cursors
+ */
+ virtual LogicalSessionIdSet getOpenCursorSessions() const = 0;
/**
* Schedule a job to be run at regular intervals until the server shuts down.
@@ -76,6 +82,12 @@ public:
*/
virtual Date_t now() const = 0;
+ /**
+ * deligaes to a similarly named function on a cursormanager
+ */
+ virtual Status killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher) = 0;
+
protected:
/**
* Returns the service context.
diff --git a/src/mongo/db/service_liason_mock.cpp b/src/mongo/db/service_liason_mock.cpp
index 821de302e02..df0d8ab5276 100644
--- a/src/mongo/db/service_liason_mock.cpp
+++ b/src/mongo/db/service_liason_mock.cpp
@@ -42,11 +42,16 @@ MockServiceLiasonImpl::MockServiceLiasonImpl() {
_runner->startup().transitional_ignore();
}
-LogicalSessionIdSet MockServiceLiasonImpl::getActiveSessions() const {
+LogicalSessionIdSet MockServiceLiasonImpl::getActiveOpSessions() const {
stdx::unique_lock<stdx::mutex> lk(_mutex);
return _activeSessions;
}
+LogicalSessionIdSet MockServiceLiasonImpl::getOpenCursorSessions() const {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ return _cursorSessions;
+}
+
void MockServiceLiasonImpl::join() {
_runner->shutdown();
}
@@ -60,9 +65,25 @@ void MockServiceLiasonImpl::scheduleJob(PeriodicRunner::PeriodicJob job) {
return;
}
+
+void MockServiceLiasonImpl::addCursorSession(LogicalSessionId lsid) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _cursorSessions.insert(std::move(lsid));
+}
+
+void MockServiceLiasonImpl::removeCursorSession(LogicalSessionId lsid) {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _cursorSessions.erase(lsid);
+}
+
+void MockServiceLiasonImpl::clearCursorSession() {
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ _cursorSessions.clear();
+}
+
void MockServiceLiasonImpl::add(LogicalSessionId lsid) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
- _activeSessions.insert(std::move(lsid));
+ _cursorSessions.insert(std::move(lsid));
}
void MockServiceLiasonImpl::remove(LogicalSessionId lsid) {
@@ -83,4 +104,15 @@ int MockServiceLiasonImpl::jobs() {
return _timerFactory->jobs();
}
+const KillAllSessionsByPattern* MockServiceLiasonImpl::matchKilled(const LogicalSessionId& lsid) {
+ return _matcher->match(lsid);
+}
+
+Status MockServiceLiasonImpl::killCursorsWithMatchingSessions(
+ OperationContext* opCtx, const SessionKiller::Matcher& matcher) {
+
+ _matcher = matcher;
+ return Status::OK();
+}
+
} // namespace mongo
diff --git a/src/mongo/db/service_liason_mock.h b/src/mongo/db/service_liason_mock.h
index 39323057e59..f7b6f0be535 100644
--- a/src/mongo/db/service_liason_mock.h
+++ b/src/mongo/db/service_liason_mock.h
@@ -59,7 +59,8 @@ public:
MockServiceLiasonImpl();
// Forwarding methods from the MockServiceLiason
- LogicalSessionIdSet getActiveSessions() const;
+ LogicalSessionIdSet getActiveOpSessions() const;
+ LogicalSessionIdSet getOpenCursorSessions() const;
Date_t now() const;
void scheduleJob(PeriodicRunner::PeriodicJob job);
void join();
@@ -68,15 +69,27 @@ public:
void add(LogicalSessionId lsid);
void remove(LogicalSessionId lsid);
void clear();
+
+ void addCursorSession(LogicalSessionId lsid);
+ void removeCursorSession(LogicalSessionId lsid);
+ void clearCursorSession();
+
void fastForward(Milliseconds time);
int jobs();
+ const KillAllSessionsByPattern* matchKilled(const LogicalSessionId& lsid);
+ Status killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher);
+
private:
executor::AsyncTimerFactoryMock* _timerFactory;
std::unique_ptr<PeriodicRunnerASIO> _runner;
+ boost::optional<SessionKiller::Matcher> _matcher;
+
mutable stdx::mutex _mutex;
LogicalSessionIdSet _activeSessions;
+ LogicalSessionIdSet _cursorSessions;
};
/**
@@ -87,8 +100,12 @@ public:
explicit MockServiceLiason(std::shared_ptr<MockServiceLiasonImpl> impl)
: _impl(std::move(impl)) {}
- LogicalSessionIdSet getActiveSessions() const override {
- return _impl->getActiveSessions();
+ LogicalSessionIdSet getActiveOpSessions() const override {
+ return _impl->getActiveOpSessions();
+ }
+
+ LogicalSessionIdSet getOpenCursorSessions() const override {
+ return _impl->getOpenCursorSessions();
}
Date_t now() const override {
@@ -103,6 +120,11 @@ public:
return _impl->join();
}
+ Status killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher) {
+ return _impl->killCursorsWithMatchingSessions(opCtx, matcher);
+ }
+
protected:
ServiceContext* _context() override {
return _serviceContext.get();
diff --git a/src/mongo/db/service_liason_mongod.cpp b/src/mongo/db/service_liason_mongod.cpp
index 7fc58d53d24..8c4ad33a987 100644
--- a/src/mongo/db/service_liason_mongod.cpp
+++ b/src/mongo/db/service_liason_mongod.cpp
@@ -40,7 +40,7 @@
namespace mongo {
-LogicalSessionIdSet ServiceLiasonMongod::getActiveSessions() const {
+LogicalSessionIdSet ServiceLiasonMongod::getActiveOpSessions() const {
LogicalSessionIdSet activeSessions;
invariant(hasGlobalServiceContext());
@@ -62,25 +62,26 @@ LogicalSessionIdSet ServiceLiasonMongod::getActiveSessions() const {
activeSessions.insert(*lsid);
}
}
+ return activeSessions;
+}
+LogicalSessionIdSet ServiceLiasonMongod::getOpenCursorSessions() const {
+ LogicalSessionIdSet cursorSessions;
// Append any in-use session ids from the global and collection-level cursor managers
- {
- boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
- auto client = Client::getCurrent();
+ boost::optional<ServiceContext::UniqueOperationContext> uniqueCtx;
+ auto client = Client::getCurrent();
- auto* const opCtx = [&client, &uniqueCtx] {
- if (client->getOperationContext()) {
- return client->getOperationContext();
- }
-
- uniqueCtx.emplace(client->makeOperationContext());
- return uniqueCtx->get();
- }();
+ auto* const opCtx = [&client, &uniqueCtx] {
+ if (client->getOperationContext()) {
+ return client->getOperationContext();
+ }
- CursorManager::appendAllActiveSessions(opCtx, &activeSessions);
- }
+ uniqueCtx.emplace(client->makeOperationContext());
+ return uniqueCtx->get();
+ }();
- return activeSessions;
+ CursorManager::appendAllActiveSessions(opCtx, &cursorSessions);
+ return cursorSessions;
}
void ServiceLiasonMongod::scheduleJob(PeriodicRunner::PeriodicJob job) {
@@ -102,4 +103,9 @@ ServiceContext* ServiceLiasonMongod::_context() {
return getGlobalServiceContext();
}
+Status ServiceLiasonMongod::killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher) {
+ return CursorManager::getGlobalCursorManager()->killCursorsWithMatchingSessions(opCtx, matcher);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/service_liason_mongod.h b/src/mongo/db/service_liason_mongod.h
index 31304b94573..3feb502c437 100644
--- a/src/mongo/db/service_liason_mongod.h
+++ b/src/mongo/db/service_liason_mongod.h
@@ -50,7 +50,8 @@ namespace mongo {
*/
class ServiceLiasonMongod : public ServiceLiason {
public:
- LogicalSessionIdSet getActiveSessions() const override;
+ LogicalSessionIdSet getActiveOpSessions() const override;
+ LogicalSessionIdSet getOpenCursorSessions() const override;
void scheduleJob(PeriodicRunner::PeriodicJob job) override;
@@ -58,6 +59,9 @@ public:
Date_t now() const override;
+ Status killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher) override;
+
protected:
/**
* Returns the service context.
diff --git a/src/mongo/db/service_liason_mongos.cpp b/src/mongo/db/service_liason_mongos.cpp
index bf0bb81b70a..37c5420d28d 100644
--- a/src/mongo/db/service_liason_mongos.cpp
+++ b/src/mongo/db/service_liason_mongos.cpp
@@ -39,7 +39,7 @@
namespace mongo {
-LogicalSessionIdSet ServiceLiasonMongos::getActiveSessions() const {
+LogicalSessionIdSet ServiceLiasonMongos::getActiveOpSessions() const {
LogicalSessionIdSet activeSessions;
invariant(hasGlobalServiceContext());
@@ -51,6 +51,12 @@ LogicalSessionIdSet ServiceLiasonMongos::getActiveSessions() const {
return activeSessions;
}
+LogicalSessionIdSet ServiceLiasonMongos::getOpenCursorSessions() const {
+ LogicalSessionIdSet openCursorSessions;
+
+ return openCursorSessions;
+}
+
void ServiceLiasonMongos::scheduleJob(PeriodicRunner::PeriodicJob job) {
invariant(hasGlobalServiceContext());
getGlobalServiceContext()->getPeriodicRunner()->scheduleJob(std::move(job));
@@ -70,4 +76,10 @@ ServiceContext* ServiceLiasonMongos::_context() {
return getGlobalServiceContext();
}
+Status ServiceLiasonMongos::killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher) {
+ auto cursorManager = Grid::get(getGlobalServiceContext())->getCursorManager();
+ return cursorManager->killCursorsWithMatchingSessions(opCtx, matcher);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/service_liason_mongos.h b/src/mongo/db/service_liason_mongos.h
index 79bc5df3fa7..26780f7b702 100644
--- a/src/mongo/db/service_liason_mongos.h
+++ b/src/mongo/db/service_liason_mongos.h
@@ -50,7 +50,8 @@ namespace mongo {
*/
class ServiceLiasonMongos : public ServiceLiason {
public:
- LogicalSessionIdSet getActiveSessions() const override;
+ LogicalSessionIdSet getActiveOpSessions() const override;
+ LogicalSessionIdSet getOpenCursorSessions() const override;
void scheduleJob(PeriodicRunner::PeriodicJob job) override;
@@ -58,6 +59,9 @@ public:
Date_t now() const override;
+ Status killCursorsWithMatchingSessions(OperationContext* opCtx,
+ const SessionKiller::Matcher& matcher) override;
+
protected:
/**
* Returns the service context.
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index b6341eb0423..69e420f40b8 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -55,15 +55,15 @@ BSONObj lsidQuery(const LogicalSessionRecord& record) {
return lsidQuery(record.getId());
}
-BSONObj updateQuery(const LogicalSessionRecord& record, Date_t refreshTime) {
+BSONObj updateQuery(const LogicalSessionRecord& record) {
// { $max : { lastUse : <time> }, $setOnInsert : { user : <user> } }
// Build our update doc.
BSONObjBuilder updateBuilder;
{
- BSONObjBuilder maxBuilder(updateBuilder.subobjStart("$max"));
- maxBuilder.append(LogicalSessionRecord::kLastUseFieldName, refreshTime);
+ BSONObjBuilder maxBuilder(updateBuilder.subobjStart("$currentDate"));
+ maxBuilder.append(LogicalSessionRecord::kLastUseFieldName, true);
}
if (record.getUser()) {
@@ -200,17 +200,15 @@ SessionsCollection::FindBatchFn SessionsCollection::makeFindFnForCommand(const N
Status SessionsCollection::doRefresh(const NamespaceString& ns,
const LogicalSessionRecordSet& sessions,
- Date_t refreshTime,
SendBatchFn send) {
auto init = [ns](BSONObjBuilder* batch) {
batch->append("update", ns.coll());
batch->append("ordered", false);
};
- auto add = [&refreshTime](BSONArrayBuilder* entries, const LogicalSessionRecord& record) {
- entries->append(BSON("q" << lsidQuery(record) << "u" << updateQuery(record, refreshTime)
- << "upsert"
- << true));
+ auto add = [](BSONArrayBuilder* entries, const LogicalSessionRecord& record) {
+ entries->append(
+ BSON("q" << lsidQuery(record) << "u" << updateQuery(record) << "upsert" << true));
};
return runBulkCmd("updates", init, add, send, sessions);
@@ -218,12 +216,12 @@ Status SessionsCollection::doRefresh(const NamespaceString& ns,
Status SessionsCollection::doRefreshExternal(const NamespaceString& ns,
const LogicalSessionRecordSet& sessions,
- Date_t refreshTime,
SendBatchFn send) {
auto makeT = [] { return std::vector<LogicalSessionRecord>{}; };
- auto add = [&refreshTime](std::vector<LogicalSessionRecord>& batch,
- const LogicalSessionRecord& record) { batch.push_back(record); };
+ auto add = [](std::vector<LogicalSessionRecord>& batch, const LogicalSessionRecord& record) {
+ batch.push_back(record);
+ };
auto sendLocal = [&](std::vector<LogicalSessionRecord>& batch) {
RefreshSessionsCmdFromClusterMember idl;
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index e86e29cd67b..9d29924069c 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -60,8 +60,7 @@ public:
* or equal to the given time. Returns an error if a networking issue occurred.
*/
virtual Status refreshSessions(OperationContext* opCtx,
- const LogicalSessionRecordSet& sessions,
- Date_t refreshTime) = 0;
+ const LogicalSessionRecordSet& sessions) = 0;
/**
* Removes the authoritative records for the specified sessions.
@@ -99,11 +98,9 @@ protected:
*/
Status doRefresh(const NamespaceString& ns,
const LogicalSessionRecordSet& sessions,
- Date_t refreshTime,
SendBatchFn send);
Status doRefreshExternal(const NamespaceString& ns,
const LogicalSessionRecordSet& sessions,
- Date_t refreshTime,
SendBatchFn send);
/**
diff --git a/src/mongo/db/sessions_collection_mock.cpp b/src/mongo/db/sessions_collection_mock.cpp
index b70f8e015bf..7f2cff63e70 100644
--- a/src/mongo/db/sessions_collection_mock.cpp
+++ b/src/mongo/db/sessions_collection_mock.cpp
@@ -103,4 +103,16 @@ Status MockSessionsCollectionImpl::_removeRecords(const LogicalSessionIdSet& ses
return Status::OK();
}
+StatusWith<LogicalSessionIdSet> MockSessionsCollectionImpl::findRemovedSessions(
+ OperationContext* opCtx, const LogicalSessionIdSet& sessions) {
+ LogicalSessionIdSet lsids;
+ stdx::unique_lock<stdx::mutex> lk(_mutex);
+ for (auto& lsid : sessions) {
+ if (_sessions.find(lsid) == _sessions.end()) {
+ lsids.emplace(lsid);
+ }
+ }
+ return lsids;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h
index 6a88a3af816..fb20533ed60 100644
--- a/src/mongo/db/sessions_collection_mock.h
+++ b/src/mongo/db/sessions_collection_mock.h
@@ -79,6 +79,9 @@ public:
void clearSessions();
const SessionMap& sessions() const;
+ StatusWith<LogicalSessionIdSet> findRemovedSessions(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions);
+
private:
// Default implementations, may be overridden with custom hooks.
Status _refreshSessions(const LogicalSessionRecordSet& sessions);
@@ -102,8 +105,7 @@ public:
: _impl(std::move(impl)) {}
Status refreshSessions(OperationContext* opCtx,
- const LogicalSessionRecordSet& sessions,
- Date_t refreshTime) override {
+ const LogicalSessionRecordSet& sessions) override {
return _impl->refreshSessions(sessions);
}
@@ -113,7 +115,7 @@ public:
StatusWith<LogicalSessionIdSet> findRemovedSessions(
OperationContext* opCtx, const LogicalSessionIdSet& sessions) override {
- return LogicalSessionIdSet{};
+ return _impl->findRemovedSessions(opCtx, sessions);
}
Status removeTransactionRecords(OperationContext* opCtx,
diff --git a/src/mongo/db/sessions_collection_rs.cpp b/src/mongo/db/sessions_collection_rs.cpp
index f4d5f44b453..4cfbffff3c3 100644
--- a/src/mongo/db/sessions_collection_rs.cpp
+++ b/src/mongo/db/sessions_collection_rs.cpp
@@ -132,8 +132,7 @@ auto dispatch(const NamespaceString& ns,
} // namespace
Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
- const LogicalSessionRecordSet& sessions,
- Date_t refreshTime) {
+ const LogicalSessionRecordSet& sessions) {
return dispatch(
kSessionsNamespaceString,
MODE_IX,
@@ -142,13 +141,11 @@ Status SessionsCollectionRS::refreshSessions(OperationContext* opCtx,
DBDirectClient client(opCtx);
return doRefresh(kSessionsNamespaceString,
sessions,
- refreshTime,
makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
},
[&](DBClientBase* client) {
return doRefreshExternal(kSessionsNamespaceString,
sessions,
- refreshTime,
makeSendFnForCommand(kSessionsNamespaceString, client));
});
}
diff --git a/src/mongo/db/sessions_collection_rs.h b/src/mongo/db/sessions_collection_rs.h
index cc49ecf511b..08459d6ff4f 100644
--- a/src/mongo/db/sessions_collection_rs.h
+++ b/src/mongo/db/sessions_collection_rs.h
@@ -58,8 +58,7 @@ public:
* or equal to the current time.
*/
Status refreshSessions(OperationContext* opCtx,
- const LogicalSessionRecordSet& sessions,
- Date_t refreshTime) override;
+ const LogicalSessionRecordSet& sessions) override;
/**
* Removes the authoritative records for the specified sessions.
diff --git a/src/mongo/db/sessions_collection_sharded.cpp b/src/mongo/db/sessions_collection_sharded.cpp
index 544915c8494..38738207196 100644
--- a/src/mongo/db/sessions_collection_sharded.cpp
+++ b/src/mongo/db/sessions_collection_sharded.cpp
@@ -53,8 +53,7 @@ BSONObj lsidQuery(const LogicalSessionId& lsid) {
} // namespace
Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
- const LogicalSessionRecordSet& sessions,
- Date_t refreshTime) {
+ const LogicalSessionRecordSet& sessions) {
auto send = [&](BSONObj toSend) {
auto opMsg = OpMsgRequest::fromDBAndBody(SessionsCollection::kSessionsDb, toSend);
auto request = BatchedCommandRequest::parseUpdate(opMsg);
@@ -72,7 +71,7 @@ Status SessionsCollectionSharded::refreshSessions(OperationContext* opCtx,
return Status(error, response.getErrMessage());
};
- return doRefresh(kSessionsNamespaceString, sessions, refreshTime, send);
+ return doRefresh(kSessionsNamespaceString, sessions, send);
}
Status SessionsCollectionSharded::removeRecords(OperationContext* opCtx,
diff --git a/src/mongo/db/sessions_collection_sharded.h b/src/mongo/db/sessions_collection_sharded.h
index f9a8165a1c7..01ac730e09a 100644
--- a/src/mongo/db/sessions_collection_sharded.h
+++ b/src/mongo/db/sessions_collection_sharded.h
@@ -48,8 +48,7 @@ public:
* or equal to the current time.
*/
Status refreshSessions(OperationContext* opCtx,
- const LogicalSessionRecordSet& sessions,
- Date_t refreshTime) override;
+ const LogicalSessionRecordSet& sessions) override;
/**
* Removes the authoritative records for the specified sessions.
diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp
index ada3583bb26..84c2b4daae8 100644
--- a/src/mongo/db/sessions_collection_standalone.cpp
+++ b/src/mongo/db/sessions_collection_standalone.cpp
@@ -45,12 +45,10 @@ BSONObj lsidQuery(const LogicalSessionId& lsid) {
} // namespace
Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx,
- const LogicalSessionRecordSet& sessions,
- Date_t refreshTime) {
+ const LogicalSessionRecordSet& sessions) {
DBDirectClient client(opCtx);
return doRefresh(kSessionsNamespaceString,
sessions,
- refreshTime,
makeSendFnForBatchWrite(kSessionsNamespaceString, &client));
}
diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h
index 2ecd49afa79..ad5f155dd64 100644
--- a/src/mongo/db/sessions_collection_standalone.h
+++ b/src/mongo/db/sessions_collection_standalone.h
@@ -47,8 +47,7 @@ public:
* or equal to the current time.
*/
Status refreshSessions(OperationContext* opCtx,
- const LogicalSessionRecordSet& sessions,
- Date_t refreshTime) override;
+ const LogicalSessionRecordSet& sessions) override;
/**
* Removes the authoritative records for the specified sessions.
diff --git a/src/mongo/dbtests/logical_sessions_tests.cpp b/src/mongo/dbtests/logical_sessions_tests.cpp
index 51614181623..13ca0c434c8 100644
--- a/src/mongo/dbtests/logical_sessions_tests.cpp
+++ b/src/mongo/dbtests/logical_sessions_tests.cpp
@@ -153,27 +153,27 @@ public:
auto thePast = now - Minutes(5);
// Attempt to refresh with no active records, should succeed (and do nothing).
- auto resRefresh = collection()->refreshSessions(opCtx(), LogicalSessionRecordSet{}, now);
+ auto resRefresh = collection()->refreshSessions(opCtx(), LogicalSessionRecordSet{});
ASSERT(resRefresh.isOK());
// Attempt to refresh one active record, should succeed.
auto record1 = makeRecord(thePast);
auto res = insertRecord(opCtx(), record1);
ASSERT_OK(res);
- resRefresh = collection()->refreshSessions(opCtx(), {record1}, now);
+ resRefresh = collection()->refreshSessions(opCtx(), {record1});
ASSERT(resRefresh.isOK());
// The timestamp on the refreshed record should be updated.
auto swRecord = fetchRecord(opCtx(), record1.getId());
ASSERT(swRecord.isOK());
- ASSERT_EQ(swRecord.getValue().getLastUse(), now);
+ ASSERT_GTE(swRecord.getValue().getLastUse(), now);
// Clear the collection.
db.remove(ns(), BSONObj());
// Attempt to refresh a record that is not present, should upsert it.
auto record2 = makeRecord(thePast);
- resRefresh = collection()->refreshSessions(opCtx(), {record2}, now);
+ resRefresh = collection()->refreshSessions(opCtx(), {record2});
ASSERT(resRefresh.isOK());
swRecord = fetchRecord(opCtx(), record2.getId());
@@ -185,23 +185,26 @@ public:
// Attempt a refresh of many records, split into batches.
LogicalSessionRecordSet toRefresh;
int recordCount = 5000;
+ unsigned int notRefreshed = 0;
for (int i = 0; i < recordCount; i++) {
- auto record = makeRecord(thePast);
+ auto record = makeRecord(now);
res = insertRecord(opCtx(), record);
// Refresh some of these records.
if (i % 4 == 0) {
toRefresh.insert(record);
+ } else {
+ notRefreshed++;
}
}
// Run the refresh, should succeed.
- resRefresh = collection()->refreshSessions(opCtx(), toRefresh, now);
+ resRefresh = collection()->refreshSessions(opCtx(), toRefresh);
ASSERT(resRefresh.isOK());
// Ensure that the right number of timestamps were updated.
auto n = db.count(ns(), BSON("lastUse" << now));
- ASSERT_EQ(n, toRefresh.size());
+ ASSERT_EQ(n, notRefreshed);
}
};