summaryrefslogtreecommitdiff
path: root/src/mongo
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo')
-rw-r--r--src/mongo/db/SConscript22
-rw-r--r--src/mongo/db/auth/authorization_session.cpp16
-rw-r--r--src/mongo/db/auth/authorization_session.h4
-rw-r--r--src/mongo/db/commands/start_session_command.cpp11
-rw-r--r--src/mongo/db/logical_session_cache.cpp95
-rw-r--r--src/mongo/db/logical_session_cache.h20
-rw-r--r--src/mongo/db/logical_session_cache_factory_mongod.cpp5
-rw-r--r--src/mongo/db/logical_session_cache_test.cpp332
-rw-r--r--src/mongo/db/logical_session_id.cpp59
-rw-r--r--src/mongo/db/logical_session_id.h7
-rw-r--r--src/mongo/db/logical_session_id.idl17
-rw-r--r--src/mongo/db/logical_session_id_helpers.cpp71
-rw-r--r--src/mongo/db/logical_session_id_helpers.h14
-rw-r--r--src/mongo/db/namespace_string.cpp2
-rw-r--r--src/mongo/db/s/migration_util.cpp1
-rw-r--r--src/mongo/db/service_liason_mock.cpp3
-rw-r--r--src/mongo/db/sessions_collection.cpp117
-rw-r--r--src/mongo/db/sessions_collection.h46
-rw-r--r--src/mongo/db/sessions_collection_mock.cpp60
-rw-r--r--src/mongo/db/sessions_collection_mock.h42
-rw-r--r--src/mongo/db/sessions_collection_standalone.cpp91
-rw-r--r--src/mongo/db/sessions_collection_standalone.h69
-rw-r--r--src/mongo/dbtests/SConscript2
-rw-r--r--src/mongo/dbtests/logical_sessions_tests.cpp196
-rw-r--r--src/mongo/idl/idl_test.cpp9
-rw-r--r--src/mongo/util/periodic_runner.h6
-rw-r--r--src/mongo/util/periodic_runner_asio.cpp9
-rw-r--r--src/mongo/util/periodic_runner_asio.h3
-rw-r--r--src/mongo/util/periodic_runner_asio_test.cpp13
29 files changed, 935 insertions, 407 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript
index 9900e938843..e0737310d2e 100644
--- a/src/mongo/db/SConscript
+++ b/src/mongo/db/SConscript
@@ -879,6 +879,7 @@ env.Library(
'$BUILD_DIR/mongo/crypto/sha_block_${MONGO_CRYPTO}',
'$BUILD_DIR/mongo/idl/idl_parser',
'$BUILD_DIR/mongo/util/uuid',
+ 'server_parameters',
],
)
@@ -889,7 +890,6 @@ env.Library(
],
LIBDEPS=[
'logical_session_id',
- 'logical_session_cache',
'$BUILD_DIR/mongo/db/auth/authcore',
],
)
@@ -977,6 +977,8 @@ env.Library(
'sessions_collection.cpp',
],
LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ 'logical_session_id',
],
)
@@ -993,12 +995,26 @@ env.Library(
)
env.Library(
+ target='sessions_collection_standalone',
+ source=[
+ 'sessions_collection_standalone.cpp',
+ ],
+ LIBDEPS=[
+ '$BUILD_DIR/mongo/base',
+ 'dbdirectclient',
+ 'logical_session_id',
+ 'sessions_collection',
+ ],
+)
+
+env.Library(
target='logical_session_cache',
source=[
'logical_session_cache.cpp',
],
LIBDEPS=[
'logical_session_id',
+ 'logical_session_id_helpers',
'sessions_collection',
'server_parameters',
'service_liason',
@@ -1016,6 +1032,7 @@ envWithAsio.CppUnitTest(
'keys_collection_manager',
'keys_collection_document',
'logical_clock',
+ 'logical_session_id',
'logical_session_id_helpers',
'logical_session_cache',
'service_liason_mock',
@@ -1031,7 +1048,8 @@ envWithAsio.Library(
LIBDEPS=[
'logical_session_cache',
'service_liason_mongod',
- 'sessions_collection_mock', # TODO SERVER-29201, SERVER-29202, SERVER-29203
+ 'sessions_collection_mock', # TODO SERVER-29202, SERVER-29203
+ 'sessions_collection_standalone',
],
)
diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp
index 42735813bb9..a29e1c2ff5e 100644
--- a/src/mongo/db/auth/authorization_session.cpp
+++ b/src/mongo/db/auth/authorization_session.cpp
@@ -176,6 +176,22 @@ User* AuthorizationSession::lookupUser(const UserName& name) {
return _authenticatedUsers.lookup(name);
}
+User* AuthorizationSession::getSingleUser() {
+ UserName userName;
+
+ auto userNameItr = getAuthenticatedUserNames();
+ if (userNameItr.more()) {
+ userName = userNameItr.next();
+ if (userNameItr.more()) {
+ uasserted(ErrorCodes::Unauthorized, "there are no users authenticated");
+ }
+ } else {
+ uasserted(ErrorCodes::Unauthorized, "too many users are authenticated");
+ }
+
+ return lookupUser(userName);
+}
+
void AuthorizationSession::logoutDatabase(const std::string& dbname) {
User* removedUser = _authenticatedUsers.removeByDBName(dbname);
if (removedUser) {
diff --git a/src/mongo/db/auth/authorization_session.h b/src/mongo/db/auth/authorization_session.h
index 65588ab0708..eccb2dcbbc3 100644
--- a/src/mongo/db/auth/authorization_session.h
+++ b/src/mongo/db/auth/authorization_session.h
@@ -119,6 +119,10 @@ public:
// and ownership of the user stays with the AuthorizationManager
User* lookupUser(const UserName& name);
+ // Returns the single user on this auth session. If no user is authenticated, or if
+ // multiple users are authenticated, this method will throw an exception.
+ User* getSingleUser();
+
// Gets an iterator over the names of all authenticated users stored in this manager.
UserNameIterator getAuthenticatedUserNames();
diff --git a/src/mongo/db/commands/start_session_command.cpp b/src/mongo/db/commands/start_session_command.cpp
index cd241c7a310..2e1d00b455f 100644
--- a/src/mongo/db/commands/start_session_command.cpp
+++ b/src/mongo/db/commands/start_session_command.cpp
@@ -82,25 +82,24 @@ public:
auto client = opCtx->getClient();
ServiceContext* serviceContext = client->getServiceContext();
- boost::optional<LogicalSessionId> lsid;
+ auto lsCache = LogicalSessionCache::get(serviceContext);
+ boost::optional<LogicalSessionRecord> record;
try {
- lsid = makeLogicalSessionId(opCtx);
+ record = makeLogicalSessionRecord(opCtx, lsCache->now());
} catch (...) {
auto status = exceptionToStatus();
return appendCommandStatus(result, status);
}
- auto lsCache = LogicalSessionCache::get(serviceContext);
-
- Status startSessionStatus = lsCache->startSession(lsid.get());
+ Status startSessionStatus = lsCache->startSession(opCtx, record.get());
if (!startSessionStatus.isOK()) {
return appendCommandStatus(result, startSessionStatus);
}
- makeLogicalSessionToClient(lsid.get()).serialize(&result);
+ makeLogicalSessionToClient(record->getId()).serialize(&result);
return true;
}
diff --git a/src/mongo/db/logical_session_cache.cpp b/src/mongo/db/logical_session_cache.cpp
index 36c1625a69c..7baeb66593d 100644
--- a/src/mongo/db/logical_session_cache.cpp
+++ b/src/mongo/db/logical_session_cache.cpp
@@ -33,6 +33,7 @@
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id.h"
+#include "mongo/db/logical_session_id_helpers.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/server_parameters.h"
#include "mongo/db/service_context.h"
@@ -51,16 +52,11 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRecordCacheSize,
int,
LogicalSessionCache::kLogicalSessionCacheDefaultCapacity);
-MONGO_EXPORT_STARTUP_SERVER_PARAMETER(localLogicalSessionTimeoutMinutes,
- int,
- LogicalSessionCache::kLogicalSessionDefaultTimeout.count());
-
MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRefreshMinutes,
int,
LogicalSessionCache::kLogicalSessionDefaultRefresh.count());
constexpr int LogicalSessionCache::kLogicalSessionCacheDefaultCapacity;
-constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultTimeout;
constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultRefresh;
LogicalSessionCache* LogicalSessionCache::get(ServiceContext* service) {
@@ -85,7 +81,7 @@ LogicalSessionCache::LogicalSessionCache(std::unique_ptr<ServiceLiason> service,
_service(std::move(service)),
_sessionsColl(std::move(collection)),
_cache(options.capacity) {
- PeriodicRunner::PeriodicJob job{[this] { _refresh(); },
+ PeriodicRunner::PeriodicJob job{[this](Client* client) { _refresh(client); },
duration_cast<Milliseconds>(_refreshInterval)};
_service->scheduleJob(std::move(job));
}
@@ -100,7 +96,9 @@ LogicalSessionCache::~LogicalSessionCache() {
}
}
-Status LogicalSessionCache::fetchAndPromote(LogicalSessionId lsid) {
+// TODO: fetch should attempt to update user info, if it is not in the found record.
+
+Status LogicalSessionCache::fetchAndPromote(OperationContext* opCtx, const LogicalSessionId& lsid) {
// Search our local cache first
auto promoteRes = promote(lsid);
if (promoteRes.isOK()) {
@@ -108,12 +106,12 @@ Status LogicalSessionCache::fetchAndPromote(LogicalSessionId lsid) {
}
// Cache miss, must fetch from the sessions collection.
- auto res = _sessionsColl->fetchRecord(lsid);
+ auto res = _sessionsColl->fetchRecord(opCtx, lsid);
// If we got a valid record, add it to our cache.
if (res.isOK()) {
auto& record = res.getValue();
- record.setLastUse(_service->now());
+ record.setLastUse(now());
// Any duplicate records here are actually the same record with different
// lastUse times, ignore them.
@@ -133,36 +131,26 @@ Status LogicalSessionCache::promote(LogicalSessionId lsid) {
}
// Do not use records if they have expired.
- auto now = _service->now();
- if (_isDead(it->second, now)) {
+ auto time = now();
+ if (_isDead(it->second, time)) {
return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"};
}
// Update the last use time before returning.
- it->second.setLastUse(now);
+ it->second.setLastUse(time);
return Status::OK();
}
-Status LogicalSessionCache::startSession(LogicalSessionId lsid) {
- LogicalSessionRecord lsr;
- lsr.setId(lsid);
- lsr.setLastUse(_service->now());
-
- // Attempt to insert into the sessions collection first. This collection enforces
- // unique session ids, so it will act as concurrency control for us.
- auto res = _sessionsColl->insertRecord(lsr);
- if (!res.isOK()) {
- return res;
- }
+Status LogicalSessionCache::startSession(OperationContext* opCtx, LogicalSessionRecord record) {
+ // Add the new record to our local cache. We will insert it into the sessions collection
+ // the next time _refresh is called.
- // Add the new record to our local cache. If we get a conflict here, and the
- // conflicting record is not dead and is not equal to our record, an interloper
- // may have ended this session and then created a new one with the same id.
- // In this case, return a failure.
- auto oldRecord = _addToCache(lsr);
+ // If we get a conflict here, then an interloper may have ended this session
+ // and then created a new one with the same id. In this case, return a failure.
+ auto oldRecord = _addToCache(record);
if (oldRecord) {
- if (*oldRecord != lsr) {
- if (!_isDead(*oldRecord, _service->now())) {
+ if (*oldRecord != record) {
+ if (!_isDead(*oldRecord, now())) {
return {ErrorCodes::DuplicateSession, "session with this id already exists"};
}
}
@@ -171,11 +159,19 @@ Status LogicalSessionCache::startSession(LogicalSessionId lsid) {
return Status::OK();
}
-void LogicalSessionCache::_refresh() {
- LogicalSessionIdSet activeSessions;
- LogicalSessionIdSet deadSessions;
+void LogicalSessionCache::refreshNow(Client* client) {
+ return _refresh(client);
+}
+
+Date_t LogicalSessionCache::now() {
+ return _service->now();
+}
- auto now = _service->now();
+void LogicalSessionCache::_refresh(Client* client) {
+ LogicalSessionRecordSet activeSessions;
+ LogicalSessionRecordSet deadSessions;
+
+ auto time = now();
// We should avoid situations where we have records in the cache
// that have been expired from the sessions collection. If they haven't been
@@ -190,10 +186,10 @@ void LogicalSessionCache::_refresh() {
for (auto& it : cacheCopy) {
auto record = it.second;
- if (!_isDead(record, now)) {
- activeSessions.insert(LogicalSessionId{record.getId()});
+ if (!_isDead(record, time)) {
+ activeSessions.insert(record);
} else {
- deadSessions.insert(LogicalSessionId{record.getId()});
+ deadSessions.insert(record);
}
}
@@ -215,17 +211,28 @@ void LogicalSessionCache::_refresh() {
if (it != _cache.end()) {
// If we have not found our record, it may have been removed
// by another thread.
- it->second.setLastUse(now);
+ it->second.setLastUse(time);
+ activeSessions.insert(it->second);
}
- activeSessions.insert(lsid);
+ // TODO SERVER-29709: Rethink how active sessions interact with refreshes,
+ // and potentially move this block above the block where we separate
+ // dead sessions from live sessions, above.
+ activeSessions.insert(makeLogicalSessionRecord(lsid, time));
}
}
// Query into the sessions collection to do the refresh. If any sessions have
// failed to refresh, it means their authoritative records were removed, and
// we should remove such records from our cache as well.
- auto failedToRefresh = _sessionsColl->refreshSessions(std::move(activeSessions));
+ {
+ auto opCtx = client->makeOperationContext();
+ auto res = _sessionsColl->refreshSessions(opCtx.get(), std::move(activeSessions), time);
+ if (!res.isOK()) {
+ // TODO SERVER-29709: handle network errors here.
+ return;
+ }
+ }
// Prune any dead records out of the cache. Dead records are ones that failed to
// refresh, or ones that have expired locally. We don't make an effort to check
@@ -233,13 +240,7 @@ void LogicalSessionCache::_refresh() {
// sessions collection. We also don't attempt to resurrect our expired records.
// However, we *do* keep records alive if they are active on the service.
{
- stdx::unique_lock<stdx::mutex> lk(_cacheMutex);
- for (auto deadId : failedToRefresh) {
- auto it = serviceSessions.find(deadId);
- if (it == serviceSessions.end()) {
- _cache.erase(deadId);
- }
- }
+ // TODO SERVER-29709: handle expiration separately from failure to refresh.
}
}
diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h
index 5d901c191ce..d5d8483f751 100644
--- a/src/mongo/db/logical_session_cache.h
+++ b/src/mongo/db/logical_session_cache.h
@@ -39,11 +39,11 @@
namespace mongo {
+class Client;
class OperationContext;
class ServiceContext;
extern int logicalSessionRecordCacheSize;
-extern int localLogicalSessionTimeoutMinutes;
extern int logicalSessionRefreshMinutes;
/**
@@ -62,7 +62,6 @@ public:
static void set(ServiceContext* service, std::unique_ptr<LogicalSessionCache> sessionCache);
static constexpr int kLogicalSessionCacheDefaultCapacity = 10000;
- static constexpr Minutes kLogicalSessionDefaultTimeout = Minutes(30);
static constexpr Minutes kLogicalSessionDefaultRefresh = Minutes(5);
/**
@@ -127,7 +126,7 @@ public:
*
* This method may issue networking calls.
*/
- Status fetchAndPromote(LogicalSessionId lsid);
+ Status fetchAndPromote(OperationContext* opCtx, const LogicalSessionId& lsid);
/**
* Inserts a new authoritative session record into the cache. This method will
@@ -135,7 +134,7 @@ public:
* should only be used when starting new sessions and should not be used to
* insert records for existing sessions.
*/
- Status startSession(LogicalSessionId lsid);
+ Status startSession(OperationContext* opCtx, LogicalSessionRecord record);
/**
* Removes all local records in this cache. Does not remove the corresponding
@@ -143,12 +142,23 @@ public:
*/
void clear();
+ /**
+ * Refreshes the cache synchronously. This flushes all pending refreshes and
+ * inserts to the sessions collection.
+ */
+ void refreshNow(Client* client);
+
+ /**
+ * Returns the current time.
+ */
+ Date_t now();
+
private:
/**
* Internal methods to handle scheduling and perform refreshes for active
* session records contained within the cache.
*/
- void _refresh();
+ void _refresh(Client* client);
/**
* Returns true if a record has passed its given expiration.
diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp
index 6f253ca5116..883e9a3da62 100644
--- a/src/mongo/db/logical_session_cache_factory_mongod.cpp
+++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp
@@ -34,6 +34,7 @@
#include "mongo/db/service_liason_mongod.h"
#include "mongo/db/sessions_collection_mock.h"
+#include "mongo/db/sessions_collection_standalone.h"
#include "mongo/stdx/memory.h"
namespace mongo {
@@ -51,9 +52,7 @@ std::unique_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheSe
return stdx::make_unique<MockSessionsCollection>(
std::make_shared<MockSessionsCollectionImpl>());
case LogicalSessionCacheServer::kStandalone:
- // TODO SERVER-29201, replace with SessionsCollectionStandalone
- return stdx::make_unique<MockSessionsCollection>(
- std::make_shared<MockSessionsCollectionImpl>());
+ return stdx::make_unique<SessionsCollectionStandalone>();
default:
MONGO_UNREACHABLE;
}
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp
index 381801cecab..436f0af2267 100644
--- a/src/mongo/db/logical_session_cache_test.cpp
+++ b/src/mongo/db/logical_session_cache_test.cpp
@@ -33,6 +33,8 @@
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/service_context_noop.h"
#include "mongo/db/service_liason_mock.h"
#include "mongo/db/sessions_collection_mock.h"
#include "mongo/stdx/future.h"
@@ -42,8 +44,7 @@
namespace mongo {
namespace {
-const Milliseconds kSessionTimeout =
- duration_cast<Milliseconds>(LogicalSessionCache::kLogicalSessionDefaultTimeout);
+const Milliseconds kSessionTimeout = duration_cast<Milliseconds>(kLogicalSessionDefaultTimeout);
const Milliseconds kForceRefresh =
duration_cast<Milliseconds>(LogicalSessionCache::kLogicalSessionDefaultRefresh);
@@ -60,6 +61,11 @@ public:
_sessions(std::make_shared<MockSessionsCollectionImpl>()) {}
void setUp() override {
+ auto client = serviceContext.makeClient("testClient");
+ _opCtx = client->makeOperationContext();
+ _client = client.get();
+ Client::setCurrent(std::move(client));
+
auto mockService = stdx::make_unique<MockServiceLiason>(_service);
auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions);
_cache =
@@ -67,7 +73,12 @@ public:
}
void tearDown() override {
+ if (_opCtx) {
+ _opCtx.reset();
+ }
+
_service->join();
+ auto client = Client::releaseCurrent();
}
void waitUntilRefreshScheduled() {
@@ -88,11 +99,32 @@ public:
return _sessions;
}
+ void setOpCtx() {
+ _opCtx = client()->makeOperationContext();
+ }
+
+ void clearOpCtx() {
+ _opCtx.reset();
+ }
+
+ OperationContext* opCtx() {
+ return _opCtx.get();
+ }
+
+ Client* client() {
+ return _client;
+ }
+
private:
+ ServiceContextNoop serviceContext;
+ ServiceContext::UniqueOperationContext _opCtx;
+
std::shared_ptr<MockServiceLiasonImpl> _service;
std::shared_ptr<MockSessionsCollectionImpl> _sessions;
std::unique_ptr<LogicalSessionCache> _cache;
+
+ Client* _client;
};
// Test that session cache fetches new records from the sessions collection
@@ -100,12 +132,12 @@ TEST_F(LogicalSessionCacheTest, CacheFetchesNewRecords) {
auto lsid = makeLogicalSessionIdForTest();
// When the record is not present (and not in the sessions collection) returns an error
- auto res = cache()->fetchAndPromote(lsid);
+ auto res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(!res.isOK());
// When the record is not present (but is in the sessions collection) returns it
sessions()->add(makeLogicalSessionRecord(lsid, service()->now()));
- res = cache()->fetchAndPromote(lsid);
+ res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
// When the record is present in the cache, returns it
@@ -115,7 +147,7 @@ TEST_F(LogicalSessionCacheTest, CacheFetchesNewRecords) {
return {ErrorCodes::NoSuchSession, "no such session"};
});
- res = cache()->fetchAndPromote(lsid);
+ res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
}
@@ -133,7 +165,7 @@ TEST_F(LogicalSessionCacheTest, TestCacheHitsOnly) {
ASSERT(!res.isOK());
// When the record is present, returns the owner
- cache()->fetchAndPromote(lsid).transitional_ignore();
+ cache()->fetchAndPromote(opCtx(), lsid).transitional_ignore();
res = cache()->promote(lsid);
ASSERT(res.isOK());
}
@@ -150,17 +182,17 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) {
// Fast forward time and fetch
service()->fastForward(Milliseconds(500));
ASSERT(start != service()->now());
- auto res = cache()->fetchAndPromote(lsid);
+ auto res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
// Now that we fetched, lifetime of session should be extended
service()->fastForward(kSessionTimeout - Milliseconds(500));
- res = cache()->fetchAndPromote(lsid);
+ res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
// We fetched again, so lifetime extended again
service()->fastForward(kSessionTimeout - Milliseconds(10));
- res = cache()->fetchAndPromote(lsid);
+ res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
// Fast forward and hit-only fetch
@@ -181,56 +213,63 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) {
// Test the startSession method
TEST_F(LogicalSessionCacheTest, StartSession) {
- auto lsid = makeLogicalSessionIdForTest();
+ auto record = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now());
+ auto lsid = record.getId();
// Test starting a new session
- auto res = cache()->startSession(lsid);
+ auto res = cache()->startSession(opCtx(), record);
ASSERT(res.isOK());
+
+ // Record will not be in the collection yet; refresh must happen first.
+ ASSERT(!sessions()->has(lsid));
+
+ // Do refresh, cached records should get flushed to collection.
+ clearOpCtx();
+ cache()->refreshNow(client());
ASSERT(sessions()->has(lsid));
- // Try to start a session that is already in the sessions collection and our
- // local cache, should fail
- res = cache()->startSession(lsid);
- ASSERT(!res.isOK());
+ // Try to start the same session again, should succeed.
+ res = cache()->startSession(opCtx(), record);
+ ASSERT(res.isOK());
// Try to start a session that is already in the sessions collection but
- // is not in our local cache, should fail
+ // is not in our local cache, should succeed.
auto record2 = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now());
- auto lsid2 = record2.getId();
- sessions()->add(std::move(record2));
- res = cache()->startSession(lsid2);
- ASSERT(!res.isOK());
+ sessions()->add(record2);
+ res = cache()->startSession(opCtx(), record2);
+ ASSERT(res.isOK());
// Try to start a session that has expired from our cache, and is no
// longer in the sessions collection, should succeed
service()->fastForward(Milliseconds(kSessionTimeout.count() + 5));
sessions()->remove(lsid);
ASSERT(!sessions()->has(lsid));
- res = cache()->startSession(lsid);
+ res = cache()->startSession(opCtx(), record);
ASSERT(res.isOK());
- ASSERT(sessions()->has(lsid));
}
// Test that records in the cache are properly refreshed until they expire
TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) {
// Insert two records into the cache
- auto lsid1 = makeLogicalSessionIdForTest();
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid1).transitional_ignore();
- cache()->startSession(lsid2).transitional_ignore();
+ auto record1 = makeLogicalSessionRecordForTest();
+ auto record2 = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record1).transitional_ignore();
+ cache()->startSession(opCtx(), record2).transitional_ignore();
stdx::promise<int> hitRefresh;
auto refreshFuture = hitRefresh.get_future();
// Advance time to first refresh point, check that refresh happens, and
// that it includes both our records
- sessions()->setRefreshHook([&hitRefresh](LogicalSessionIdSet sessions) {
+ sessions()->setRefreshHook([&hitRefresh](const LogicalSessionRecordSet& sessions) {
hitRefresh.set_value(sessions.size());
- return LogicalSessionIdSet{};
+ return Status::OK();
});
// Wait for the refresh to happen
+ clearOpCtx();
service()->fastForward(kForceRefresh);
+ cache()->refreshNow(client());
refreshFuture.wait();
ASSERT_EQ(refreshFuture.get(), 2);
@@ -240,98 +279,39 @@ TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) {
auto refresh2Future = refresh2.get_future();
// Use one of the records
- auto res = cache()->fetchAndPromote(lsid1);
+ setOpCtx();
+ auto res = cache()->fetchAndPromote(opCtx(), record1.getId());
ASSERT(res.isOK());
// Advance time so that one record expires
// Ensure that first record was refreshed, and second was thrown away
- sessions()->setRefreshHook([&refresh2](LogicalSessionIdSet sessions) {
+ sessions()->setRefreshHook([&refresh2](const LogicalSessionRecordSet& sessions) {
// We should only have one record here, the other should have expired
ASSERT_EQ(sessions.size(), size_t(1));
- refresh2.set_value(*(sessions.begin()));
- return LogicalSessionIdSet{};
+ refresh2.set_value(sessions.begin()->getId());
+ return Status::OK();
});
- // Wait until the second job has been scheduled
- waitUntilRefreshScheduled();
-
+ clearOpCtx();
service()->fastForward(kSessionTimeout - kForceRefresh + Milliseconds(1));
+ cache()->refreshNow(client());
refresh2Future.wait();
- ASSERT_EQ(refresh2Future.get(), lsid1);
-}
-
-// Test that cache deletes records that fail to refresh
-TEST_F(LogicalSessionCacheTest, CacheDeletesRecordsThatFailToRefresh) {
- // Put two sessions into the cache
- auto lsid1 = makeLogicalSessionIdForTest();
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid1).transitional_ignore();
- cache()->startSession(lsid2).transitional_ignore();
-
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
-
- // Record 1 fails to refresh
- sessions()->setRefreshHook([&hitRefresh, &lsid1](LogicalSessionIdSet sessions) {
- ASSERT_EQ(sessions.size(), size_t(2));
- hitRefresh.set_value();
- return LogicalSessionIdSet{lsid1};
- });
-
- // Force a refresh
- service()->fastForward(kForceRefresh);
- refreshFuture.wait();
-
- // Ensure that one record is still there and the other is gone
- auto res = cache()->promote(lsid1);
- ASSERT(!res.isOK());
- res = cache()->promote(lsid2);
- ASSERT(res.isOK());
-}
-
-// Test that we don't remove records that fail to refresh if they are active on the service
-TEST_F(LogicalSessionCacheTest, KeepActiveSessionAliveEvenIfRefreshFails) {
- // Put two sessions into the cache, one into the service
- auto lsid1 = makeLogicalSessionIdForTest();
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid1).transitional_ignore();
- service()->add(lsid1);
- cache()->startSession(lsid2).transitional_ignore();
-
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
-
- // SignedLsid 1 fails to refresh
- sessions()->setRefreshHook([&hitRefresh, &lsid1](LogicalSessionIdSet sessions) {
- ASSERT_EQ(sessions.size(), size_t(2));
- hitRefresh.set_value();
- return LogicalSessionIdSet{lsid1};
- });
-
- // Force a refresh
- service()->fastForward(kForceRefresh);
- refreshFuture.wait();
-
- // Ensure that both lsids are still there
- auto res = cache()->promote(lsid1);
- ASSERT(res.isOK());
- res = cache()->promote(lsid2);
- ASSERT(res.isOK());
+ ASSERT_EQ(refresh2Future.get(), record1.getId());
}
// Test that session cache properly expires lsids after 30 minutes of no use
TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) {
// Insert a lsid
- auto lsid = makeLogicalSessionIdForTest();
- cache()->startSession(lsid).transitional_ignore();
- auto res = cache()->promote(lsid);
+ auto record = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record).transitional_ignore();
+ auto res = cache()->promote(record.getId());
ASSERT(res.isOK());
// Force it to expire
service()->fastForward(Milliseconds(kSessionTimeout.count() + 5));
// Check that it is no longer in the cache
- res = cache()->promote(lsid);
+ res = cache()->promote(record.getId());
ASSERT(!res.isOK());
}
@@ -342,47 +322,31 @@ TEST_F(LogicalSessionCacheTest, LongRunningQueriesAreRefreshed) {
// Insert one active lsid on the service, none in the cache
service()->add(lsid);
- stdx::mutex mutex;
- stdx::condition_variable cv;
int count = 0;
- sessions()->setRefreshHook([&cv, &mutex, &count, &lsid](LogicalSessionIdSet sessions) {
- ASSERT_EQ(*(sessions.begin()), lsid);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- count++;
- }
- cv.notify_all();
-
- return LogicalSessionIdSet{};
+ sessions()->setRefreshHook([&count, &lsid](const LogicalSessionRecordSet& sessions) {
+ ASSERT_EQ(sessions.size(), size_t(1));
+ ASSERT_EQ(sessions.begin()->getId(), lsid);
+ count++;
+ return Status::OK();
});
+ clearOpCtx();
+
// Force a refresh, it should refresh our active session
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count] { return count == 1; });
- }
-
- // Wait until the next job has been scheduled
- waitUntilRefreshScheduled();
+ cache()->refreshNow(client());
+ ASSERT_EQ(count, 1);
// Force a session timeout, session is still on the service
service()->fastForward(kSessionTimeout);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count] { return count == 2; });
- }
-
- // Wait until the next job has been scheduled
- waitUntilRefreshScheduled();
+ cache()->refreshNow(client());
+ ASSERT_EQ(count, 2);
// Force another refresh, check that it refreshes that active lsid again
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count] { return count == 3; });
- }
+ cache()->refreshNow(client());
+ ASSERT_EQ(count, 3);
}
// Test that the set of lsids we refresh is a sum of cached + active lsids
@@ -390,45 +354,39 @@ TEST_F(LogicalSessionCacheTest, RefreshCachedAndServiceSignedLsidsTogether) {
// Put one session into the cache, one into the service
auto lsid1 = makeLogicalSessionIdForTest();
service()->add(lsid1);
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid2).transitional_ignore();
-
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
+ auto record2 = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record2).transitional_ignore();
- // Both lsids refresh
- sessions()->setRefreshHook([&hitRefresh](LogicalSessionIdSet sessions) {
+ // Both signedLsids refresh
+ sessions()->setRefreshHook([](const LogicalSessionRecordSet& sessions) {
ASSERT_EQ(sessions.size(), size_t(2));
- hitRefresh.set_value();
- return LogicalSessionIdSet{};
+ return Status::OK();
});
// Force a refresh
+ clearOpCtx();
service()->fastForward(kForceRefresh);
- refreshFuture.wait();
+ cache()->refreshNow(client());
}
// Test large sets of cache-only session lsids
TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) {
int count = LogicalSessionCache::kLogicalSessionCacheDefaultCapacity;
for (int i = 0; i < count; i++) {
- auto lsid = makeLogicalSessionIdForTest();
- cache()->startSession(lsid).transitional_ignore();
+ auto record = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record).transitional_ignore();
}
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
-
- // Check that all lsids refresh
- sessions()->setRefreshHook([&hitRefresh, &count](LogicalSessionIdSet sessions) {
+ // Check that all signedLsids refresh
+ sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) {
ASSERT_EQ(sessions.size(), size_t(count));
- hitRefresh.set_value();
- return LogicalSessionIdSet{};
+ return Status::OK();
});
// Force a refresh
+ clearOpCtx();
service()->fastForward(kForceRefresh);
- refreshFuture.wait();
+ cache()->refreshNow(client());
}
// Test larger sets of service-only session lsids
@@ -439,19 +397,16 @@ TEST_F(LogicalSessionCacheTest, ManyLongRunningSessionsRefresh) {
service()->add(lsid);
}
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
-
- // Check that all lsids refresh
- sessions()->setRefreshHook([&hitRefresh, &count](LogicalSessionIdSet sessions) {
+ // Check that all signedLsids refresh
+ sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) {
ASSERT_EQ(sessions.size(), size_t(count));
- hitRefresh.set_value();
- return LogicalSessionIdSet{};
+ return Status::OK();
});
// Force a refresh
+ clearOpCtx();
service()->fastForward(kForceRefresh);
- refreshFuture.wait();
+ cache()->refreshNow(client());
}
// Test larger mixed sets of cache/service active sessions
@@ -461,77 +416,44 @@ TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) {
auto lsid = makeLogicalSessionIdForTest();
service()->add(lsid);
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid2).transitional_ignore();
+ auto record2 = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record2).transitional_ignore();
}
- stdx::mutex mutex;
- stdx::condition_variable cv;
- int refreshes = 0;
int nRefreshed = 0;
// Check that all lsids refresh successfully
- sessions()->setRefreshHook(
- [&refreshes, &mutex, &cv, &nRefreshed](LogicalSessionIdSet sessions) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- refreshes++;
- nRefreshed = sessions.size();
- }
- cv.notify_all();
-
- return LogicalSessionIdSet{};
- });
+ sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) {
+ nRefreshed = sessions.size();
+ return Status::OK();
+ });
// Force a refresh
+ clearOpCtx();
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&refreshes] { return refreshes == 1; });
- }
+ cache()->refreshNow(client());
ASSERT_EQ(nRefreshed, count * 2);
// Remove all of the service sessions, should just refresh the cache entries
- // (and make all but one fail to refresh)
service()->clear();
- sessions()->setRefreshHook(
- [&refreshes, &mutex, &cv, &nRefreshed](LogicalSessionIdSet sessions) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- refreshes++;
- nRefreshed = sessions.size();
- }
- cv.notify_all();
-
- sessions.erase(sessions.begin());
- return sessions;
- });
-
- // Wait for job to be scheduled
- waitUntilRefreshScheduled();
+ sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) {
+ nRefreshed = sessions.size();
+ return Status::OK();
+ });
// Force another refresh
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&refreshes] { return refreshes == 2; });
- }
+ cache()->refreshNow(client());
// We should not have refreshed any sessions from the service, only the cache
ASSERT_EQ(nRefreshed, count);
- // Wait for job to be scheduled
- waitUntilRefreshScheduled();
-
// Force a third refresh
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&refreshes] { return refreshes == 3; });
- }
+ cache()->refreshNow(client());
- // Since all but one lsid failed to refresh, third set should just have one lsid
- ASSERT_EQ(nRefreshed, 1);
+ // Again, we should have only refreshed sessions from the cache
+ ASSERT_EQ(nRefreshed, count);
}
} // namespace
diff --git a/src/mongo/db/logical_session_id.cpp b/src/mongo/db/logical_session_id.cpp
index ad1dd8a496d..3f59f19f766 100644
--- a/src/mongo/db/logical_session_id.cpp
+++ b/src/mongo/db/logical_session_id.cpp
@@ -1,41 +1,42 @@
/**
- * Copyright (C) 2017 MongoDB Inc.
+ * Copyright (C) 2017 MongoDB Inc.
*
- * This program is free software: you can redistribute it and/or modify
- * it under the terms of the GNU Affero General Public License, version 3,
- * as published by the Free Software Foundation.
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
*
- * This program is distributed in the hope that it will be useful,
- * but WITHOUT ANY WARRANTY; without even the implied warranty of
- * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU Affero General Public License for more details.
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
*
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
*
- * As a special exception, the copyright holders give permission to link the
- * code of portions of this program with the OpenSSL library under certain
- * conditions as described in each individual source file and distribute
- * linked combinations including the program with the OpenSSL library. You
- * must comply with the GNU Affero General Public License in all respects for
- * all of the code used other than as permitted herein. If you modify file(s)
- * with this exception, you may extend this exception to your version of the
- * file(s), but you are not obligated to do so. If you do not wish to do so,
- * delete this exception statement from your version. If you delete this
- * exception statement from all source files in the program, then also delete
- * it in the license file.
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/logical_session_id.h"
-
-#include "mongo/bson/bsonobjbuilder.h"
-#include "mongo/db/logical_session_cache.h"
-#include "mongo/util/assert_util.h"
+#include "mongo/db/server_parameters.h"
namespace mongo {
+MONGO_EXPORT_STARTUP_SERVER_PARAMETER(localLogicalSessionTimeoutMinutes,
+ int,
+ kLogicalSessionDefaultTimeout.count());
+
LogicalSessionId makeLogicalSessionIdForTest() {
LogicalSessionId lsid;
@@ -45,4 +46,12 @@ LogicalSessionId makeLogicalSessionIdForTest() {
return lsid;
}
+LogicalSessionRecord makeLogicalSessionRecordForTest() {
+ LogicalSessionRecord record{};
+
+ record.setId(makeLogicalSessionIdForTest());
+
+ return record;
+}
+
} // namespace mongo
diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h
index 4984a57cc30..37937d79a56 100644
--- a/src/mongo/db/logical_session_id.h
+++ b/src/mongo/db/logical_session_id.h
@@ -44,9 +44,11 @@ const StmtId kUninitializedStmtId = -1;
const TxnNumber kUninitializedTxnNumber = -1;
class BSONObjBuilder;
-
class OperationContext;
+const Minutes kLogicalSessionDefaultTimeout = Minutes(30);
+extern int localLogicalSessionTimeoutMinutes;
+
inline bool operator==(const LogicalSessionId& lhs, const LogicalSessionId& rhs) {
auto makeEqualityLens = [](const auto& lsid) { return std::tie(lsid.getId(), lsid.getUid()); };
@@ -67,6 +69,8 @@ inline bool operator!=(const LogicalSessionRecord& lhs, const LogicalSessionReco
LogicalSessionId makeLogicalSessionIdForTest();
+LogicalSessionRecord makeLogicalSessionRecordForTest();
+
struct LogicalSessionIdHash {
std::size_t operator()(const LogicalSessionId& lsid) const {
return _hasher(lsid.getId());
@@ -106,5 +110,6 @@ inline StringBuilder& operator<<(StringBuilder& s, const LogicalSessionFromClien
* An alias for sets of session ids.
*/
using LogicalSessionIdSet = stdx::unordered_set<LogicalSessionId, LogicalSessionIdHash>;
+using LogicalSessionRecordSet = stdx::unordered_set<LogicalSessionRecord, LogicalSessionRecordHash>;
} // namespace mongo
diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl
index ccedd28e6f6..c540c3c2174 100644
--- a/src/mongo/db/logical_session_id.idl
+++ b/src/mongo/db/logical_session_id.idl
@@ -64,12 +64,27 @@ structs:
id: LogicalSessionIdToClient
timeoutMinutes: int
+ UserNameWithId:
+ description: "A struct representing the combination of a UserName and an id"
+ strict: true
+ fields:
+ name:
+ type: string
+ id:
+ type: objectid
+ optional: true
+
LogicalSessionRecord:
description: "A struct representing a LogicalSessionRecord"
strict: true
fields:
- id: LogicalSessionId
+ _id:
+ type: LogicalSessionId
+ cpp_name: id
lastUse: date
+ user:
+ type: UserNameWithId
+ optional: true
LogicalSessionFromClient:
description: "A struct representing a LogicalSessionId from external clients"
diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp
index fbed0134c83..ae68e5037bf 100644
--- a/src/mongo/db/logical_session_id_helpers.cpp
+++ b/src/mongo/db/logical_session_id_helpers.cpp
@@ -52,22 +52,7 @@ SHA256Block lookupUserDigest(OperationContext* opCtx) {
if (AuthorizationManager::get(serviceContext)->isAuthEnabled()) {
UserName userName;
- auto authzSession = AuthorizationSession::get(client);
- auto userNameItr = authzSession->getAuthenticatedUserNames();
- if (userNameItr.more()) {
- userName = userNameItr.next();
- if (userNameItr.more()) {
- uasserted(ErrorCodes::Unauthorized,
- "must only be authenticated as exactly one user "
- "to create a logical session");
- }
- } else {
- uasserted(ErrorCodes::Unauthorized,
- "must only be authenticated as exactly one user "
- "to create a logical session");
- }
-
- User* user = authzSession->lookupUser(userName);
+ auto user = AuthorizationSession::get(client)->getSingleUser();
invariant(user);
return user->getDigest();
@@ -109,8 +94,36 @@ LogicalSessionId makeLogicalSessionId(OperationContext* opCtx) {
return id;
}
+LogicalSessionRecord makeLogicalSessionRecord(OperationContext* opCtx, Date_t lastUse) {
+ LogicalSessionId id{};
+ LogicalSessionRecord lsr{};
+
+ auto client = opCtx->getClient();
+ ServiceContext* serviceContext = client->getServiceContext();
+ if (AuthorizationManager::get(serviceContext)->isAuthEnabled()) {
+ auto user = AuthorizationSession::get(client)->getSingleUser();
+ invariant(user);
+
+ id.setUid(user->getDigest());
+
+ UserNameWithId userDoc{};
+ userDoc.setName(StringData(user->getName().toString()));
+ userDoc.setId(user->getID());
+ lsr.setUser(userDoc);
+ } else {
+ id.setUid(kNoAuthDigest);
+ }
+
+ id.setId(UUID::gen());
+
+ lsr.setId(id);
+ lsr.setLastUse(lastUse);
+
+ return lsr;
+}
+
LogicalSessionRecord makeLogicalSessionRecord(const LogicalSessionId& lsid, Date_t lastUse) {
- LogicalSessionRecord lsr;
+ LogicalSessionRecord lsr{};
lsr.setId(lsid);
lsr.setLastUse(lastUse);
@@ -118,11 +131,35 @@ LogicalSessionRecord makeLogicalSessionRecord(const LogicalSessionId& lsid, Date
return lsr;
}
+LogicalSessionRecord makeLogicalSessionRecord(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ Date_t lastUse) {
+ auto lsr = makeLogicalSessionRecord(lsid, lastUse);
+
+ auto client = opCtx->getClient();
+ ServiceContext* serviceContext = client->getServiceContext();
+ if (AuthorizationManager::get(serviceContext)->isAuthEnabled()) {
+ auto user = AuthorizationSession::get(client)->getSingleUser();
+ invariant(user);
+
+ if (user->getDigest() == lsid.getUid()) {
+ UserNameWithId userDoc{};
+ userDoc.setName(StringData(user->getName().toString()));
+ userDoc.setId(user->getID());
+ lsr.setUser(userDoc);
+ }
+ }
+
+ return lsr;
+}
+
+
LogicalSessionToClient makeLogicalSessionToClient(const LogicalSessionId& lsid) {
LogicalSessionIdToClient lsitc;
lsitc.setId(lsid.getId());
LogicalSessionToClient id;
+
id.setId(lsitc);
id.setTimeoutMinutes(localLogicalSessionTimeoutMinutes);
diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h
index 5f1ff3c854e..e7cf637d984 100644
--- a/src/mongo/db/logical_session_id_helpers.h
+++ b/src/mongo/db/logical_session_id_helpers.h
@@ -32,10 +32,24 @@
namespace mongo {
+/**
+ * Factory functions to generate logical session records.
+ */
LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& lsid,
OperationContext* opCtx);
LogicalSessionId makeLogicalSessionId(OperationContext* opCtx);
+
+/**
+ * Factory functions to make logical session records. The overloads that
+ * take an OperationContext should be used when possible, as they will also set the
+ * user information on the record.
+ */
LogicalSessionRecord makeLogicalSessionRecord(const LogicalSessionId& lsid, Date_t lastUse);
+LogicalSessionRecord makeLogicalSessionRecord(OperationContext* opCtx, Date_t lastUse);
+LogicalSessionRecord makeLogicalSessionRecord(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ Date_t lastUse);
+
LogicalSessionToClient makeLogicalSessionToClient(const LogicalSessionId& lsid);
/**
diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp
index 44fbc2fe003..35fbde965dd 100644
--- a/src/mongo/db/namespace_string.cpp
+++ b/src/mongo/db/namespace_string.cpp
@@ -115,6 +115,8 @@ bool NamespaceString::isLegalClientSystemNS() const {
return true;
if (ns() == "admin.system.backup_users")
return true;
+ if (ns() == "admin.system.sessions")
+ return true;
}
if (ns() == "local.system.replset")
return true;
diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp
index 0eccd40f3db..234aae0565d 100644
--- a/src/mongo/db/s/migration_util.cpp
+++ b/src/mongo/db/s/migration_util.cpp
@@ -62,4 +62,5 @@ BSONObj makeMigrationStatusDocument(const NamespaceString& nss,
}
} // namespace migrationutil
+
} // namespace mongo
diff --git a/src/mongo/db/service_liason_mock.cpp b/src/mongo/db/service_liason_mock.cpp
index 4c2ba699acb..821de302e02 100644
--- a/src/mongo/db/service_liason_mock.cpp
+++ b/src/mongo/db/service_liason_mock.cpp
@@ -56,7 +56,8 @@ Date_t MockServiceLiasonImpl::now() const {
}
void MockServiceLiasonImpl::scheduleJob(PeriodicRunner::PeriodicJob job) {
- _runner->scheduleJob(std::move(job));
+ // The cache should be refreshed from tests by calling refreshNow().
+ return;
}
void MockServiceLiasonImpl::add(LogicalSessionId lsid) {
diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp
index 1838d0cfe0e..b5c66195afb 100644
--- a/src/mongo/db/sessions_collection.cpp
+++ b/src/mongo/db/sessions_collection.cpp
@@ -30,8 +30,125 @@
#include "mongo/db/sessions_collection.h"
+#include <memory>
+
+#include "mongo/bson/bsonobjbuilder.h"
+#include "mongo/db/logical_session_id.h"
+#include "mongo/stdx/functional.h"
+#include "mongo/stdx/memory.h"
+
namespace mongo {
+namespace {
+
+BSONObj lsidQuery(const LogicalSessionId& lsid) {
+ return BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON());
+}
+
+BSONObj lsidQuery(const LogicalSessionRecord& record) {
+ return lsidQuery(record.getId());
+}
+
+BSONObj updateQuery(const LogicalSessionRecord& record, Date_t refreshTime) {
+ // { $max : { lastUse : <time> }, $setOnInsert : { user : <user> } }
+
+ // Build our update doc.
+ BSONObjBuilder updateBuilder;
+
+ {
+ BSONObjBuilder maxBuilder(updateBuilder.subobjStart("$max"));
+ maxBuilder.append(LogicalSessionRecord::kLastUseFieldName, refreshTime);
+ }
+
+ if (record.getUser()) {
+ BSONObjBuilder setBuilder(updateBuilder.subobjStart("$setOnInsert"));
+ setBuilder.append(LogicalSessionRecord::kUserFieldName, record.getUser()->toBSON());
+ }
+
+ return updateBuilder.obj();
+}
+
+template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container>
+Status runBulkCmd(StringData label,
+ InitBatchFn&& initBatch,
+ AddLineFn&& addLine,
+ SendBatchFn&& sendBatch,
+ const Container& items) {
+ int i = 0;
+ BufBuilder buf;
+
+ boost::optional<BSONObjBuilder> batchBuilder;
+ boost::optional<BSONArrayBuilder> entries;
+
+ auto setupBatchBuilder = [&] {
+ buf.reset();
+ batchBuilder.emplace(buf);
+ initBatch(&(batchBuilder.get()));
+ entries.emplace(batchBuilder->subarrayStart(label));
+ };
+
+ auto sendLocalBatch = [&] {
+ entries->done();
+ return sendBatch(batchBuilder->done());
+ };
+
+ setupBatchBuilder();
+
+ for (const auto& item : items) {
+ addLine(&(entries.get()), item);
+
+ if (++i >= 1000) {
+ auto res = sendLocalBatch();
+ if (!res.isOK()) {
+ return res;
+ }
+
+ setupBatchBuilder();
+ i = 0;
+ }
+ }
+
+ return sendLocalBatch();
+}
+
+} // namespace
+
+
+constexpr StringData SessionsCollection::kSessionsDb;
+constexpr StringData SessionsCollection::kSessionsCollection;
+constexpr StringData SessionsCollection::kSessionsFullNS;
+
+
SessionsCollection::~SessionsCollection() = default;
+Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime,
+ SendBatchFn send) {
+ auto init = [](BSONObjBuilder* batch) {
+ batch->append("update", kSessionsCollection);
+ batch->append("ordered", false);
+ };
+
+ auto add = [&refreshTime](BSONArrayBuilder* entries, const LogicalSessionRecord& record) {
+ entries->append(BSON("q" << lsidQuery(record) << "u" << updateQuery(record, refreshTime)
+ << "upsert"
+ << true));
+ };
+
+ return runBulkCmd("updates", init, add, send, sessions);
+}
+
+Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send) {
+ auto init = [](BSONObjBuilder* batch) {
+ batch->append("delete", kSessionsCollection);
+ batch->append("ordered", false);
+ };
+
+ auto add = [](BSONArrayBuilder* builder, const LogicalSessionId& lsid) {
+ builder->append(BSON("q" << lsidQuery(lsid) << "limit" << 0));
+ };
+
+ return runBulkCmd("deletes", init, add, send, sessions);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h
index 4a7503a293e..e1cda80934f 100644
--- a/src/mongo/db/sessions_collection.h
+++ b/src/mongo/db/sessions_collection.h
@@ -29,9 +29,14 @@
#pragma once
#include "mongo/db/logical_session_id.h"
+#include "mongo/stdx/functional.h"
namespace mongo {
+class BSONArrayBuilder;
+class BSONObjBuilder;
+class OperationContext;
+
/**
* An abstract interface describing the entrypoint into the sessions collection.
*
@@ -42,37 +47,50 @@ class SessionsCollection {
public:
virtual ~SessionsCollection();
+ static constexpr StringData kSessionsDb = "admin"_sd;
+ static constexpr StringData kSessionsCollection = "system.sessions"_sd;
+ static constexpr StringData kSessionsFullNS = "admin.system.sessions"_sd;
+
/**
* Returns a LogicalSessionRecord for the given session id. This method
* may run networking operations on the calling thread.
*/
- virtual StatusWith<LogicalSessionRecord> fetchRecord(LogicalSessionId id) = 0;
-
- /**
- * Inserts the given record into the sessions collection. This method may run
- * networking operations on the calling thread.
- *
- * Returns a DuplicateSession error if the session already exists in the
- * sessions collection.
- */
- virtual Status insertRecord(LogicalSessionRecord record) = 0;
+ virtual StatusWith<LogicalSessionRecord> fetchRecord(OperationContext* opCtx,
+ const LogicalSessionId& id) = 0;
/**
* Updates the last-use times on the given sessions to be greater than
- * or equal to the current time.
+ * or equal to the given time.
*
* Returns a list of sessions for which no authoritative record was found,
- * and hence were not refreshed.
+ * and hence were not refreshed. Returns an error if a networking issue occurred.
*/
- virtual LogicalSessionIdSet refreshSessions(LogicalSessionIdSet sessions) = 0;
+ virtual Status refreshSessions(OperationContext* opCtx,
+ const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime) = 0;
/**
* Removes the authoritative records for the specified sessions.
*
* Implementations should perform authentication checks to ensure that
* session records may only be removed if their owner is logged in.
+ *
+ * Returns an error if the removal fails, for example from a network error.
+ */
+ virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0;
+
+protected:
+ using SendBatchFn = stdx::function<Status(BSONObj batch)>;
+
+ /**
+ * Formats and sends batches of refreshes for the given set of sessions.
+ */
+ Status doRefresh(const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send);
+
+ /**
+ * Formats and sends batches of deletes for the given set of sessions.
*/
- virtual void removeRecords(LogicalSessionIdSet sessions) = 0;
+ Status doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send);
};
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_mock.cpp b/src/mongo/db/sessions_collection_mock.cpp
index 832213e1223..09deaaa3b1f 100644
--- a/src/mongo/db/sessions_collection_mock.cpp
+++ b/src/mongo/db/sessions_collection_mock.cpp
@@ -35,7 +35,6 @@ namespace mongo {
MockSessionsCollectionImpl::MockSessionsCollectionImpl()
: _sessions(),
_fetch(stdx::bind(&MockSessionsCollectionImpl::_fetchRecord, this, stdx::placeholders::_1)),
- _insert(stdx::bind(&MockSessionsCollectionImpl::_insertRecord, this, stdx::placeholders::_1)),
_refresh(
stdx::bind(&MockSessionsCollectionImpl::_refreshSessions, this, stdx::placeholders::_1)),
_remove(
@@ -45,10 +44,6 @@ void MockSessionsCollectionImpl::setFetchHook(FetchHook hook) {
_fetch = std::move(hook);
}
-void MockSessionsCollectionImpl::setInsertHook(InsertHook hook) {
- _insert = std::move(hook);
-}
-
void MockSessionsCollectionImpl::setRefreshHook(RefreshHook hook) {
_refresh = std::move(hook);
}
@@ -59,26 +54,22 @@ void MockSessionsCollectionImpl::setRemoveHook(RemoveHook hook) {
void MockSessionsCollectionImpl::clearHooks() {
_fetch = stdx::bind(&MockSessionsCollectionImpl::_fetchRecord, this, stdx::placeholders::_1);
- _insert = stdx::bind(&MockSessionsCollectionImpl::_insertRecord, this, stdx::placeholders::_1);
_refresh =
stdx::bind(&MockSessionsCollectionImpl::_refreshSessions, this, stdx::placeholders::_1);
_remove = stdx::bind(&MockSessionsCollectionImpl::_removeRecords, this, stdx::placeholders::_1);
}
-StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::fetchRecord(LogicalSessionId id) {
- return _fetch(std::move(id));
-}
-
-Status MockSessionsCollectionImpl::insertRecord(LogicalSessionRecord record) {
- return _insert(std::move(record));
+StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::fetchRecord(
+ const LogicalSessionId& id) {
+ return _fetch(id);
}
-LogicalSessionIdSet MockSessionsCollectionImpl::refreshSessions(LogicalSessionIdSet sessions) {
- return _refresh(std::move(sessions));
+Status MockSessionsCollectionImpl::refreshSessions(const LogicalSessionRecordSet& sessions) {
+ return _refresh(sessions);
}
-void MockSessionsCollectionImpl::removeRecords(LogicalSessionIdSet sessions) {
- _remove(std::move(sessions));
+Status MockSessionsCollectionImpl::removeRecords(const LogicalSessionIdSet& sessions) {
+ return _remove(std::move(sessions));
}
void MockSessionsCollectionImpl::add(LogicalSessionRecord record) {
@@ -105,7 +96,8 @@ const MockSessionsCollectionImpl::SessionMap& MockSessionsCollectionImpl::sessio
return _sessions;
}
-StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::_fetchRecord(LogicalSessionId id) {
+StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::_fetchRecord(
+ const LogicalSessionId& id) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
// If we do not have this record, return an error
@@ -117,40 +109,22 @@ StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::_fetchRecord(Logica
return it->second;
}
-Status MockSessionsCollectionImpl::_insertRecord(LogicalSessionRecord record) {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- auto res = _sessions.insert({record.getId(), std::move(record)});
-
- // We should never try to insert the same record twice. In theory this could
- // happen because of a UUID conflict.
- if (!res.second) {
- return {ErrorCodes::DuplicateSession, "Session already exists in the sessions collection"};
- }
-
- return Status::OK();
-}
-
-LogicalSessionIdSet MockSessionsCollectionImpl::_refreshSessions(LogicalSessionIdSet sessions) {
- LogicalSessionIdSet notFound{};
-
- {
- stdx::unique_lock<stdx::mutex> lk(_mutex);
- for (auto& lsid : sessions) {
- auto it = _sessions.find(lsid);
- if (it == _sessions.end()) {
- notFound.insert(lsid);
- }
+Status MockSessionsCollectionImpl::_refreshSessions(const LogicalSessionRecordSet& sessions) {
+ for (auto& record : sessions) {
+ if (!has(record.getId())) {
+ _sessions.insert({record.getId(), record});
}
}
-
- return notFound;
+ return Status::OK();
}
-void MockSessionsCollectionImpl::_removeRecords(LogicalSessionIdSet sessions) {
+Status MockSessionsCollectionImpl::_removeRecords(const LogicalSessionIdSet& sessions) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
for (auto& lsid : sessions) {
_sessions.erase(lsid);
}
+
+ return Status::OK();
}
} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h
index 870d0434991..5456e0731a2 100644
--- a/src/mongo/db/sessions_collection_mock.h
+++ b/src/mongo/db/sessions_collection_mock.h
@@ -58,14 +58,12 @@ public:
MockSessionsCollectionImpl();
- using FetchHook = stdx::function<StatusWith<LogicalSessionRecord>(LogicalSessionId)>;
- using InsertHook = stdx::function<Status(LogicalSessionRecord)>;
- using RefreshHook = stdx::function<LogicalSessionIdSet(LogicalSessionIdSet)>;
- using RemoveHook = stdx::function<void(LogicalSessionIdSet)>;
+ using FetchHook = stdx::function<StatusWith<LogicalSessionRecord>(const LogicalSessionId&)>;
+ using RefreshHook = stdx::function<Status(const LogicalSessionRecordSet&)>;
+ using RemoveHook = stdx::function<Status(const LogicalSessionIdSet&)>;
// Set custom hooks to override default behavior
void setFetchHook(FetchHook hook);
- void setInsertHook(InsertHook hook);
void setRefreshHook(RefreshHook hook);
void setRemoveHook(RemoveHook hook);
@@ -73,10 +71,9 @@ public:
void clearHooks();
// Forwarding methods from the MockSessionsCollection
- StatusWith<LogicalSessionRecord> fetchRecord(LogicalSessionId id);
- Status insertRecord(LogicalSessionRecord record);
- LogicalSessionIdSet refreshSessions(LogicalSessionIdSet sessions);
- void removeRecords(LogicalSessionIdSet sessions);
+ StatusWith<LogicalSessionRecord> fetchRecord(const LogicalSessionId& id);
+ Status refreshSessions(const LogicalSessionRecordSet& sessions);
+ Status removeRecords(const LogicalSessionIdSet& sessions);
// Test-side methods that operate on the _sessions map
void add(LogicalSessionRecord record);
@@ -87,16 +84,14 @@ public:
private:
// Default implementations, may be overridden with custom hooks.
- StatusWith<LogicalSessionRecord> _fetchRecord(LogicalSessionId id);
- Status _insertRecord(LogicalSessionRecord record);
- LogicalSessionIdSet _refreshSessions(LogicalSessionIdSet sessions);
- void _removeRecords(LogicalSessionIdSet sessions);
+ StatusWith<LogicalSessionRecord> _fetchRecord(const LogicalSessionId& id);
+ Status _refreshSessions(const LogicalSessionRecordSet& sessions);
+ Status _removeRecords(const LogicalSessionIdSet& sessions);
stdx::mutex _mutex;
SessionMap _sessions;
FetchHook _fetch;
- InsertHook _insert;
RefreshHook _refresh;
RemoveHook _remove;
};
@@ -111,20 +106,19 @@ public:
explicit MockSessionsCollection(std::shared_ptr<MockSessionsCollectionImpl> impl)
: _impl(std::move(impl)) {}
- StatusWith<LogicalSessionRecord> fetchRecord(LogicalSessionId id) override {
- return _impl->fetchRecord(std::move(id));
+ StatusWith<LogicalSessionRecord> fetchRecord(OperationContext* opCtx,
+ const LogicalSessionId& id) override {
+ return _impl->fetchRecord(id);
}
- Status insertRecord(LogicalSessionRecord record) override {
- return _impl->insertRecord(std::move(record));
+ Status refreshSessions(OperationContext* opCtx,
+ const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime) override {
+ return _impl->refreshSessions(sessions);
}
- LogicalSessionIdSet refreshSessions(LogicalSessionIdSet sessions) override {
- return _impl->refreshSessions(std::move(sessions));
- }
-
- void removeRecords(LogicalSessionIdSet sessions) override {
- return _impl->removeRecords(std::move(sessions));
+ Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override {
+ return _impl->removeRecords(sessions);
}
private:
diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp
new file mode 100644
index 00000000000..093b61ef959
--- /dev/null
+++ b/src/mongo/db/sessions_collection_standalone.cpp
@@ -0,0 +1,91 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects
+ * for all of the code used other than as permitted herein. If you modify
+ * file(s) with this exception, you may extend this exception to your
+ * version of the file(s), but you are not obligated to do so. If you do not
+ * wish to do so, delete this exception statement from your version. If you
+ * delete this exception statement from all source files in the program,
+ * then also delete it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include "mongo/db/sessions_collection_standalone.h"
+
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/client/query.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/operation_context.h"
+
+namespace mongo {
+
+namespace {
+
+BSONObj lsidQuery(const LogicalSessionId& lsid) {
+ return BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON());
+}
+} // namespace
+
+StatusWith<LogicalSessionRecord> SessionsCollectionStandalone::fetchRecord(
+ OperationContext* opCtx, const LogicalSessionId& lsid) {
+ DBDirectClient client(opCtx);
+ auto cursor = client.query(kSessionsFullNS.toString(), lsidQuery(lsid), 1);
+ if (!cursor->more()) {
+ return {ErrorCodes::NoSuchSession, "No matching record in the sessions collection"};
+ }
+
+ try {
+ IDLParserErrorContext ctx("LogicalSessionRecord");
+ return LogicalSessionRecord::parse(ctx, cursor->next());
+ } catch (...) {
+ return exceptionToStatus();
+ }
+}
+
+Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx,
+ const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime) {
+ DBDirectClient client(opCtx);
+ return doRefresh(sessions, refreshTime, makeSendFn(&client));
+}
+
+Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx,
+ const LogicalSessionIdSet& sessions) {
+ DBDirectClient client(opCtx);
+ return doRemove(sessions, makeSendFn(&client));
+}
+
+SessionsCollection::SendBatchFn SessionsCollectionStandalone::makeSendFn(DBDirectClient* client) {
+ auto send = [client](BSONObj batch) -> Status {
+ BSONObj res;
+ auto ok = client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res);
+ if (!ok) {
+ return {ErrorCodes::UnknownError,
+ client->getLastError(SessionsCollection::kSessionsDb.toString())};
+ }
+ return Status::OK();
+ };
+
+ return send;
+}
+
+
+} // namespace mongo
diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h
new file mode 100644
index 00000000000..0fee4cbd164
--- /dev/null
+++ b/src/mongo/db/sessions_collection_standalone.h
@@ -0,0 +1,69 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#pragma once
+
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/sessions_collection.h"
+#include "mongo/util/time_support.h"
+
+namespace mongo {
+
+class DBDirectClient;
+class OperationContext;
+
+/**
+ * Accesses the sessions collection directly for standalone servers.
+ */
+class SessionsCollectionStandalone : public SessionsCollection {
+public:
+ /**
+ * Returns a LogicalSessionRecord for the given session id, or an error if
+ * no such record was found.
+ */
+ StatusWith<LogicalSessionRecord> fetchRecord(OperationContext* opCtx,
+ const LogicalSessionId& lsid) override;
+
+ /**
+ * Updates the last-use times on the given sessions to be greater than
+ * or equal to the current time.
+ */
+ Status refreshSessions(OperationContext* opCtx,
+ const LogicalSessionRecordSet& sessions,
+ Date_t refreshTime) override;
+
+ /**
+ * Removes the authoritative records for the specified sessions.
+ */
+ Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override;
+
+private:
+ SessionsCollection::SendBatchFn makeSendFn(DBDirectClient* client);
+};
+
+} // namespace mongo
diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript
index 74383934018..18b9e406ea6 100644
--- a/src/mongo/dbtests/SConscript
+++ b/src/mongo/dbtests/SConscript
@@ -71,6 +71,7 @@ dbtest = env.Program(
'jsobjtests.cpp',
'jsontests.cpp',
'jstests.cpp',
+ 'logical_sessions_tests.cpp',
'matchertests.cpp',
'mmaptests.cpp',
'mock_dbclient_conn_test.cpp',
@@ -126,6 +127,7 @@ dbtest = env.Program(
"$BUILD_DIR/mongo/db/repl/replication_consistency_markers_impl",
"$BUILD_DIR/mongo/db/repl/replmocks",
"$BUILD_DIR/mongo/db/serveronly",
+ "$BUILD_DIR/mongo/db/sessions_collection_standalone",
"$BUILD_DIR/mongo/db/logical_clock",
"$BUILD_DIR/mongo/db/logical_time_metadata_hook",
"$BUILD_DIR/mongo/db/storage/mmap_v1/paths",
diff --git a/src/mongo/dbtests/logical_sessions_tests.cpp b/src/mongo/dbtests/logical_sessions_tests.cpp
new file mode 100644
index 00000000000..f642cf5e377
--- /dev/null
+++ b/src/mongo/dbtests/logical_sessions_tests.cpp
@@ -0,0 +1,196 @@
+/**
+ * Copyright (C) 2017 MongoDB Inc.
+ *
+ * This program is free software: you can redistribute it and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3,
+ * as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful,
+ * but WITHOUT ANY WARRANTY; without even the implied warranty of
+ * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU Affero General Public License for more details.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see <http://www.gnu.org/licenses/>.
+ *
+ * As a special exception, the copyright holders give permission to link the
+ * code of portions of this program with the OpenSSL library under certain
+ * conditions as described in each individual source file and distribute
+ * linked combinations including the program with the OpenSSL library. You
+ * must comply with the GNU Affero General Public License in all respects for
+ * all of the code used other than as permitted herein. If you modify file(s)
+ * with this exception, you may extend this exception to your version of the
+ * file(s), but you are not obligated to do so. If you do not wish to do so,
+ * delete this exception statement from your version. If you delete this
+ * exception statement from all source files in the program, then also delete
+ * it in the license file.
+ */
+
+#include "mongo/platform/basic.h"
+
+#include <memory>
+
+#include "mongo/client/dbclientinterface.h"
+#include "mongo/client/index_spec.h"
+#include "mongo/db/dbdirectclient.h"
+#include "mongo/db/logical_session_id.h"
+#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/operation_context.h"
+#include "mongo/db/sessions_collection.h"
+#include "mongo/db/sessions_collection_standalone.h"
+#include "mongo/dbtests/dbtests.h"
+#include "mongo/stdx/memory.h"
+#include "mongo/util/time_support.h"
+
+namespace LogicalSessionTests {
+
+namespace {
+constexpr StringData kTestNS = "admin.system.sessions"_sd;
+
+LogicalSessionRecord makeRecord(Date_t time = Date_t::now()) {
+ auto record = makeLogicalSessionRecordForTest();
+ record.setLastUse(time);
+ return record;
+}
+
+Status insertRecord(OperationContext* opCtx, LogicalSessionRecord record) {
+ DBDirectClient client(opCtx);
+
+ client.insert(kTestNS.toString(), record.toBSON());
+ auto errorString = client.getLastError();
+ if (errorString.empty()) {
+ return Status::OK();
+ }
+
+ return {ErrorCodes::DuplicateSession, errorString};
+}
+
+} // namespace
+
+class SessionsCollectionStandaloneTest {
+public:
+ SessionsCollectionStandaloneTest()
+ : _collection(stdx::make_unique<SessionsCollectionStandalone>()) {
+ _opCtx = cc().makeOperationContext();
+ DBDirectClient db(opCtx());
+ db.remove(ns(), BSONObj());
+ }
+
+ virtual ~SessionsCollectionStandaloneTest() {
+ DBDirectClient db(opCtx());
+ db.remove(ns(), BSONObj());
+ _opCtx.reset();
+ }
+
+ SessionsCollectionStandalone* collection() {
+ return _collection.get();
+ }
+
+ OperationContext* opCtx() {
+ return _opCtx.get();
+ }
+
+ std::string ns() {
+ return SessionsCollection::kSessionsFullNS.toString();
+ }
+
+private:
+ std::unique_ptr<SessionsCollectionStandalone> _collection;
+ ServiceContext::UniqueOperationContext _opCtx;
+};
+
+// Test that removal from this collection works.
+class SessionsCollectionStandaloneRemoveTest : public SessionsCollectionStandaloneTest {
+public:
+ void run() {
+ auto record1 = makeRecord();
+ auto record2 = makeRecord();
+
+ auto res = insertRecord(opCtx(), record1);
+ ASSERT_OK(res);
+ res = insertRecord(opCtx(), record2);
+ ASSERT_OK(res);
+
+ // Remove one record, the other stays
+ res = collection()->removeRecords(opCtx(), {record1.getId()});
+ ASSERT_OK(res);
+
+ auto swRecord = collection()->fetchRecord(opCtx(), record1.getId());
+ ASSERT(!swRecord.isOK());
+
+ swRecord = collection()->fetchRecord(opCtx(), record2.getId());
+ ASSERT(swRecord.isOK());
+ }
+};
+
+// Test that refreshing entries in this collection works.
+class SessionsCollectionStandaloneRefreshTest : public SessionsCollectionStandaloneTest {
+public:
+ void run() {
+ DBDirectClient db(opCtx());
+
+ // Attempt to refresh one active record, should succeed.
+ auto now = Date_t::now();
+ auto thePast = now - Minutes(5);
+
+ auto record1 = makeRecord(thePast);
+ auto res = insertRecord(opCtx(), record1);
+ ASSERT_OK(res);
+ auto resRefresh = collection()->refreshSessions(opCtx(), {record1}, now);
+ ASSERT(resRefresh.isOK());
+
+ // The timestamp on the refreshed record should be updated.
+ auto swRecord = collection()->fetchRecord(opCtx(), record1.getId());
+ ASSERT(swRecord.isOK());
+ ASSERT_EQ(swRecord.getValue().getLastUse(), now);
+
+ // Clear the collection.
+ db.remove(ns(), BSONObj());
+
+ // Attempt to refresh a record that is not present, should upsert it.
+ auto record2 = makeRecord(thePast);
+ resRefresh = collection()->refreshSessions(opCtx(), {record2}, now);
+ ASSERT(resRefresh.isOK());
+
+ swRecord = collection()->fetchRecord(opCtx(), record2.getId());
+ ASSERT(swRecord.isOK());
+
+ // Clear the collection.
+ db.remove(ns(), BSONObj());
+
+ // Attempt a refresh of many records, split into batches.
+ LogicalSessionRecordSet toRefresh;
+ int recordCount = 5000;
+ for (int i = 0; i < recordCount; i++) {
+ auto record = makeRecord(thePast);
+ res = insertRecord(opCtx(), record);
+
+ // Refresh some of these records.
+ if (i % 4 == 0) {
+ toRefresh.insert(record);
+ }
+ }
+
+ // Run the refresh, should succeed.
+ resRefresh = collection()->refreshSessions(opCtx(), toRefresh, now);
+ ASSERT(resRefresh.isOK());
+
+ // Ensure that the right number of timestamps were updated.
+ auto n = db.count(ns(), BSON("lastUse" << now));
+ ASSERT_EQ(n, toRefresh.size());
+ }
+};
+
+class All : public Suite {
+public:
+ All() : Suite("logical_sessions") {}
+
+ void setupTests() {
+ add<SessionsCollectionStandaloneRemoveTest>();
+ add<SessionsCollectionStandaloneRefreshTest>();
+ }
+};
+
+SuiteInstance<All> all;
+
+} // namespace LogicalSessionTests
diff --git a/src/mongo/idl/idl_test.cpp b/src/mongo/idl/idl_test.cpp
index a67de83a429..3b0387b412c 100644
--- a/src/mongo/idl/idl_test.cpp
+++ b/src/mongo/idl/idl_test.cpp
@@ -547,10 +547,10 @@ TEST(IDLNestedStruct, TestDuplicateTypes) {
"field3" << BSON("field1" << 4 << "field2" << 5 << "field3" << 6));
auto testStruct = NestedWithDuplicateTypes::parse(ctxt, testDoc);
- assert_same_types<decltype(testStruct.getField1()), const RequiredStrictField3&>();
+ assert_same_types<decltype(testStruct.getField1()), RequiredStrictField3&>();
assert_same_types<decltype(testStruct.getField2()),
const boost::optional<RequiredNonStrictField3>&>();
- assert_same_types<decltype(testStruct.getField3()), const RequiredStrictField3&>();
+ assert_same_types<decltype(testStruct.getField3()), RequiredStrictField3&>();
ASSERT_EQUALS(1, testStruct.getField1().getField1());
ASSERT_EQUALS(2, testStruct.getField1().getField2());
@@ -1228,10 +1228,9 @@ TEST(IDLChainedType, TestChainedStruct) {
auto testStruct = Chained_struct_mixed::parse(ctxt, testDoc);
- assert_same_types<decltype(testStruct.getChained_any_basic_type()),
- const Chained_any_basic_type&>();
+ assert_same_types<decltype(testStruct.getChained_any_basic_type()), Chained_any_basic_type&>();
assert_same_types<decltype(testStruct.getChainedObjectBasicType()),
- const Chained_object_basic_type&>();
+ Chained_object_basic_type&>();
ASSERT_EQUALS(testStruct.getField3(), "abc");
diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h
index 21e37d0e438..6b80226aa55 100644
--- a/src/mongo/util/periodic_runner.h
+++ b/src/mongo/util/periodic_runner.h
@@ -34,6 +34,8 @@
namespace mongo {
+class Client;
+
/**
* An interface for objects that run work items at specified intervals.
*
@@ -41,10 +43,12 @@ namespace mongo {
* model they wish. Implementations may choose when to stop running
* scheduled jobs (for example, some implementations may stop running
* when the server is in global shutdown).
+ *
+ * The runner will create client objects that it passes to jobs to use.
*/
class PeriodicRunner {
public:
- using Job = stdx::function<void()>;
+ using Job = stdx::function<void(Client* client)>;
struct PeriodicJob {
PeriodicJob(Job callable, Milliseconds period)
diff --git a/src/mongo/util/periodic_runner_asio.cpp b/src/mongo/util/periodic_runner_asio.cpp
index 9fadfc67124..5ff42019340 100644
--- a/src/mongo/util/periodic_runner_asio.cpp
+++ b/src/mongo/util/periodic_runner_asio.cpp
@@ -95,7 +95,7 @@ void PeriodicRunnerASIO::_scheduleJob(std::weak_ptr<PeriodicJobASIO> job) {
}
lockedJob->start = _timerFactory->now();
- lockedJob->job();
+ lockedJob->job(_client);
_io_service.post([this, job]() mutable { _scheduleJob(job); });
});
@@ -110,11 +110,16 @@ Status PeriodicRunnerASIO::startup() {
_state = State::kRunning;
_thread = stdx::thread([this]() {
try {
- Client::initThread("PeriodicRunnerASIO");
+ auto client = getGlobalServiceContext()->makeClient("PeriodicRunnerASIO");
+ _client = client.get();
+ Client::setCurrent(std::move(client));
asio::io_service::work workItem(_io_service);
std::error_code ec;
_io_service.run(ec);
+
+ client = Client::releaseCurrent();
+
if (ec) {
severe() << "Failure in PeriodicRunnerASIO: " << ec.message();
fassertFailed(40438);
diff --git a/src/mongo/util/periodic_runner_asio.h b/src/mongo/util/periodic_runner_asio.h
index e5be509f0ea..dcf11429594 100644
--- a/src/mongo/util/periodic_runner_asio.h
+++ b/src/mongo/util/periodic_runner_asio.h
@@ -37,6 +37,8 @@
namespace mongo {
+class Client;
+
/**
* A PeriodicRunner implementation that uses the ASIO library's eventing system
* to schedule and run jobs at regular intervals.
@@ -106,6 +108,7 @@ private:
asio::io_service _io_service;
asio::io_service::strand _strand;
+ Client* _client;
stdx::thread _thread;
std::unique_ptr<executor::AsyncTimerFactoryInterface> _timerFactory;
diff --git a/src/mongo/util/periodic_runner_asio_test.cpp b/src/mongo/util/periodic_runner_asio_test.cpp
index 7a5e7b5fc04..86397801b11 100644
--- a/src/mongo/util/periodic_runner_asio_test.cpp
+++ b/src/mongo/util/periodic_runner_asio_test.cpp
@@ -38,6 +38,9 @@
#include "mongo/util/periodic_runner_asio.h"
namespace mongo {
+
+class Client;
+
namespace {
class PeriodicRunnerASIOTestNoSetup : public unittest::Test {
@@ -91,7 +94,7 @@ TEST_F(PeriodicRunnerASIOTest, OneJobTest) {
// Add a job, ensure that it runs once
PeriodicRunner::PeriodicJob job(
- [&count, &mutex, &cv] {
+ [&count, &mutex, &cv](Client*) {
{
stdx::unique_lock<stdx::mutex> lk(mutex);
count++;
@@ -128,7 +131,7 @@ TEST_F(PeriodicRunnerASIOTestNoSetup, ScheduleBeforeStartupTest) {
// Schedule a job before startup
PeriodicRunner::PeriodicJob job(
- [&count, &mutex, &cv] {
+ [&count, &mutex, &cv](Client*) {
{
stdx::unique_lock<stdx::mutex> lk(mutex);
count++;
@@ -153,7 +156,7 @@ TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) {
Milliseconds interval{5};
// Schedule a job before shutdown
- PeriodicRunner::PeriodicJob job([&count] { count++; }, interval);
+ PeriodicRunner::PeriodicJob job([&count](Client*) { count++; }, interval);
runner()->scheduleJob(std::move(job));
@@ -185,7 +188,7 @@ TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) {
// Add two jobs, ensure they both run the proper number of times
PeriodicRunner::PeriodicJob jobA(
- [&countA, &mutex, &cv] {
+ [&countA, &mutex, &cv](Client*) {
{
stdx::unique_lock<stdx::mutex> lk(mutex);
countA++;
@@ -195,7 +198,7 @@ TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) {
intervalA);
PeriodicRunner::PeriodicJob jobB(
- [&countB, &mutex, &cv] {
+ [&countB, &mutex, &cv](Client*) {
{
stdx::unique_lock<stdx::mutex> lk(mutex);
countB++;