diff options
Diffstat (limited to 'src/mongo')
29 files changed, 935 insertions, 407 deletions
diff --git a/src/mongo/db/SConscript b/src/mongo/db/SConscript index 9900e938843..e0737310d2e 100644 --- a/src/mongo/db/SConscript +++ b/src/mongo/db/SConscript @@ -879,6 +879,7 @@ env.Library( '$BUILD_DIR/mongo/crypto/sha_block_${MONGO_CRYPTO}', '$BUILD_DIR/mongo/idl/idl_parser', '$BUILD_DIR/mongo/util/uuid', + 'server_parameters', ], ) @@ -889,7 +890,6 @@ env.Library( ], LIBDEPS=[ 'logical_session_id', - 'logical_session_cache', '$BUILD_DIR/mongo/db/auth/authcore', ], ) @@ -977,6 +977,8 @@ env.Library( 'sessions_collection.cpp', ], LIBDEPS=[ + '$BUILD_DIR/mongo/base', + 'logical_session_id', ], ) @@ -993,12 +995,26 @@ env.Library( ) env.Library( + target='sessions_collection_standalone', + source=[ + 'sessions_collection_standalone.cpp', + ], + LIBDEPS=[ + '$BUILD_DIR/mongo/base', + 'dbdirectclient', + 'logical_session_id', + 'sessions_collection', + ], +) + +env.Library( target='logical_session_cache', source=[ 'logical_session_cache.cpp', ], LIBDEPS=[ 'logical_session_id', + 'logical_session_id_helpers', 'sessions_collection', 'server_parameters', 'service_liason', @@ -1016,6 +1032,7 @@ envWithAsio.CppUnitTest( 'keys_collection_manager', 'keys_collection_document', 'logical_clock', + 'logical_session_id', 'logical_session_id_helpers', 'logical_session_cache', 'service_liason_mock', @@ -1031,7 +1048,8 @@ envWithAsio.Library( LIBDEPS=[ 'logical_session_cache', 'service_liason_mongod', - 'sessions_collection_mock', # TODO SERVER-29201, SERVER-29202, SERVER-29203 + 'sessions_collection_mock', # TODO SERVER-29202, SERVER-29203 + 'sessions_collection_standalone', ], ) diff --git a/src/mongo/db/auth/authorization_session.cpp b/src/mongo/db/auth/authorization_session.cpp index 42735813bb9..a29e1c2ff5e 100644 --- a/src/mongo/db/auth/authorization_session.cpp +++ b/src/mongo/db/auth/authorization_session.cpp @@ -176,6 +176,22 @@ User* AuthorizationSession::lookupUser(const UserName& name) { return _authenticatedUsers.lookup(name); } +User* AuthorizationSession::getSingleUser() { + UserName userName; + + auto userNameItr = getAuthenticatedUserNames(); + if (userNameItr.more()) { + userName = userNameItr.next(); + if (userNameItr.more()) { + uasserted(ErrorCodes::Unauthorized, "there are no users authenticated"); + } + } else { + uasserted(ErrorCodes::Unauthorized, "too many users are authenticated"); + } + + return lookupUser(userName); +} + void AuthorizationSession::logoutDatabase(const std::string& dbname) { User* removedUser = _authenticatedUsers.removeByDBName(dbname); if (removedUser) { diff --git a/src/mongo/db/auth/authorization_session.h b/src/mongo/db/auth/authorization_session.h index 65588ab0708..eccb2dcbbc3 100644 --- a/src/mongo/db/auth/authorization_session.h +++ b/src/mongo/db/auth/authorization_session.h @@ -119,6 +119,10 @@ public: // and ownership of the user stays with the AuthorizationManager User* lookupUser(const UserName& name); + // Returns the single user on this auth session. If no user is authenticated, or if + // multiple users are authenticated, this method will throw an exception. + User* getSingleUser(); + // Gets an iterator over the names of all authenticated users stored in this manager. UserNameIterator getAuthenticatedUserNames(); diff --git a/src/mongo/db/commands/start_session_command.cpp b/src/mongo/db/commands/start_session_command.cpp index cd241c7a310..2e1d00b455f 100644 --- a/src/mongo/db/commands/start_session_command.cpp +++ b/src/mongo/db/commands/start_session_command.cpp @@ -82,25 +82,24 @@ public: auto client = opCtx->getClient(); ServiceContext* serviceContext = client->getServiceContext(); - boost::optional<LogicalSessionId> lsid; + auto lsCache = LogicalSessionCache::get(serviceContext); + boost::optional<LogicalSessionRecord> record; try { - lsid = makeLogicalSessionId(opCtx); + record = makeLogicalSessionRecord(opCtx, lsCache->now()); } catch (...) { auto status = exceptionToStatus(); return appendCommandStatus(result, status); } - auto lsCache = LogicalSessionCache::get(serviceContext); - - Status startSessionStatus = lsCache->startSession(lsid.get()); + Status startSessionStatus = lsCache->startSession(opCtx, record.get()); if (!startSessionStatus.isOK()) { return appendCommandStatus(result, startSessionStatus); } - makeLogicalSessionToClient(lsid.get()).serialize(&result); + makeLogicalSessionToClient(record->getId()).serialize(&result); return true; } diff --git a/src/mongo/db/logical_session_cache.cpp b/src/mongo/db/logical_session_cache.cpp index 36c1625a69c..7baeb66593d 100644 --- a/src/mongo/db/logical_session_cache.cpp +++ b/src/mongo/db/logical_session_cache.cpp @@ -33,6 +33,7 @@ #include "mongo/db/logical_session_cache.h" #include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id_helpers.h" #include "mongo/db/operation_context.h" #include "mongo/db/server_parameters.h" #include "mongo/db/service_context.h" @@ -51,16 +52,11 @@ MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRecordCacheSize, int, LogicalSessionCache::kLogicalSessionCacheDefaultCapacity); -MONGO_EXPORT_STARTUP_SERVER_PARAMETER(localLogicalSessionTimeoutMinutes, - int, - LogicalSessionCache::kLogicalSessionDefaultTimeout.count()); - MONGO_EXPORT_STARTUP_SERVER_PARAMETER(logicalSessionRefreshMinutes, int, LogicalSessionCache::kLogicalSessionDefaultRefresh.count()); constexpr int LogicalSessionCache::kLogicalSessionCacheDefaultCapacity; -constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultTimeout; constexpr Minutes LogicalSessionCache::kLogicalSessionDefaultRefresh; LogicalSessionCache* LogicalSessionCache::get(ServiceContext* service) { @@ -85,7 +81,7 @@ LogicalSessionCache::LogicalSessionCache(std::unique_ptr<ServiceLiason> service, _service(std::move(service)), _sessionsColl(std::move(collection)), _cache(options.capacity) { - PeriodicRunner::PeriodicJob job{[this] { _refresh(); }, + PeriodicRunner::PeriodicJob job{[this](Client* client) { _refresh(client); }, duration_cast<Milliseconds>(_refreshInterval)}; _service->scheduleJob(std::move(job)); } @@ -100,7 +96,9 @@ LogicalSessionCache::~LogicalSessionCache() { } } -Status LogicalSessionCache::fetchAndPromote(LogicalSessionId lsid) { +// TODO: fetch should attempt to update user info, if it is not in the found record. + +Status LogicalSessionCache::fetchAndPromote(OperationContext* opCtx, const LogicalSessionId& lsid) { // Search our local cache first auto promoteRes = promote(lsid); if (promoteRes.isOK()) { @@ -108,12 +106,12 @@ Status LogicalSessionCache::fetchAndPromote(LogicalSessionId lsid) { } // Cache miss, must fetch from the sessions collection. - auto res = _sessionsColl->fetchRecord(lsid); + auto res = _sessionsColl->fetchRecord(opCtx, lsid); // If we got a valid record, add it to our cache. if (res.isOK()) { auto& record = res.getValue(); - record.setLastUse(_service->now()); + record.setLastUse(now()); // Any duplicate records here are actually the same record with different // lastUse times, ignore them. @@ -133,36 +131,26 @@ Status LogicalSessionCache::promote(LogicalSessionId lsid) { } // Do not use records if they have expired. - auto now = _service->now(); - if (_isDead(it->second, now)) { + auto time = now(); + if (_isDead(it->second, time)) { return {ErrorCodes::NoSuchSession, "no matching session record found in the cache"}; } // Update the last use time before returning. - it->second.setLastUse(now); + it->second.setLastUse(time); return Status::OK(); } -Status LogicalSessionCache::startSession(LogicalSessionId lsid) { - LogicalSessionRecord lsr; - lsr.setId(lsid); - lsr.setLastUse(_service->now()); - - // Attempt to insert into the sessions collection first. This collection enforces - // unique session ids, so it will act as concurrency control for us. - auto res = _sessionsColl->insertRecord(lsr); - if (!res.isOK()) { - return res; - } +Status LogicalSessionCache::startSession(OperationContext* opCtx, LogicalSessionRecord record) { + // Add the new record to our local cache. We will insert it into the sessions collection + // the next time _refresh is called. - // Add the new record to our local cache. If we get a conflict here, and the - // conflicting record is not dead and is not equal to our record, an interloper - // may have ended this session and then created a new one with the same id. - // In this case, return a failure. - auto oldRecord = _addToCache(lsr); + // If we get a conflict here, then an interloper may have ended this session + // and then created a new one with the same id. In this case, return a failure. + auto oldRecord = _addToCache(record); if (oldRecord) { - if (*oldRecord != lsr) { - if (!_isDead(*oldRecord, _service->now())) { + if (*oldRecord != record) { + if (!_isDead(*oldRecord, now())) { return {ErrorCodes::DuplicateSession, "session with this id already exists"}; } } @@ -171,11 +159,19 @@ Status LogicalSessionCache::startSession(LogicalSessionId lsid) { return Status::OK(); } -void LogicalSessionCache::_refresh() { - LogicalSessionIdSet activeSessions; - LogicalSessionIdSet deadSessions; +void LogicalSessionCache::refreshNow(Client* client) { + return _refresh(client); +} + +Date_t LogicalSessionCache::now() { + return _service->now(); +} - auto now = _service->now(); +void LogicalSessionCache::_refresh(Client* client) { + LogicalSessionRecordSet activeSessions; + LogicalSessionRecordSet deadSessions; + + auto time = now(); // We should avoid situations where we have records in the cache // that have been expired from the sessions collection. If they haven't been @@ -190,10 +186,10 @@ void LogicalSessionCache::_refresh() { for (auto& it : cacheCopy) { auto record = it.second; - if (!_isDead(record, now)) { - activeSessions.insert(LogicalSessionId{record.getId()}); + if (!_isDead(record, time)) { + activeSessions.insert(record); } else { - deadSessions.insert(LogicalSessionId{record.getId()}); + deadSessions.insert(record); } } @@ -215,17 +211,28 @@ void LogicalSessionCache::_refresh() { if (it != _cache.end()) { // If we have not found our record, it may have been removed // by another thread. - it->second.setLastUse(now); + it->second.setLastUse(time); + activeSessions.insert(it->second); } - activeSessions.insert(lsid); + // TODO SERVER-29709: Rethink how active sessions interact with refreshes, + // and potentially move this block above the block where we separate + // dead sessions from live sessions, above. + activeSessions.insert(makeLogicalSessionRecord(lsid, time)); } } // Query into the sessions collection to do the refresh. If any sessions have // failed to refresh, it means their authoritative records were removed, and // we should remove such records from our cache as well. - auto failedToRefresh = _sessionsColl->refreshSessions(std::move(activeSessions)); + { + auto opCtx = client->makeOperationContext(); + auto res = _sessionsColl->refreshSessions(opCtx.get(), std::move(activeSessions), time); + if (!res.isOK()) { + // TODO SERVER-29709: handle network errors here. + return; + } + } // Prune any dead records out of the cache. Dead records are ones that failed to // refresh, or ones that have expired locally. We don't make an effort to check @@ -233,13 +240,7 @@ void LogicalSessionCache::_refresh() { // sessions collection. We also don't attempt to resurrect our expired records. // However, we *do* keep records alive if they are active on the service. { - stdx::unique_lock<stdx::mutex> lk(_cacheMutex); - for (auto deadId : failedToRefresh) { - auto it = serviceSessions.find(deadId); - if (it == serviceSessions.end()) { - _cache.erase(deadId); - } - } + // TODO SERVER-29709: handle expiration separately from failure to refresh. } } diff --git a/src/mongo/db/logical_session_cache.h b/src/mongo/db/logical_session_cache.h index 5d901c191ce..d5d8483f751 100644 --- a/src/mongo/db/logical_session_cache.h +++ b/src/mongo/db/logical_session_cache.h @@ -39,11 +39,11 @@ namespace mongo { +class Client; class OperationContext; class ServiceContext; extern int logicalSessionRecordCacheSize; -extern int localLogicalSessionTimeoutMinutes; extern int logicalSessionRefreshMinutes; /** @@ -62,7 +62,6 @@ public: static void set(ServiceContext* service, std::unique_ptr<LogicalSessionCache> sessionCache); static constexpr int kLogicalSessionCacheDefaultCapacity = 10000; - static constexpr Minutes kLogicalSessionDefaultTimeout = Minutes(30); static constexpr Minutes kLogicalSessionDefaultRefresh = Minutes(5); /** @@ -127,7 +126,7 @@ public: * * This method may issue networking calls. */ - Status fetchAndPromote(LogicalSessionId lsid); + Status fetchAndPromote(OperationContext* opCtx, const LogicalSessionId& lsid); /** * Inserts a new authoritative session record into the cache. This method will @@ -135,7 +134,7 @@ public: * should only be used when starting new sessions and should not be used to * insert records for existing sessions. */ - Status startSession(LogicalSessionId lsid); + Status startSession(OperationContext* opCtx, LogicalSessionRecord record); /** * Removes all local records in this cache. Does not remove the corresponding @@ -143,12 +142,23 @@ public: */ void clear(); + /** + * Refreshes the cache synchronously. This flushes all pending refreshes and + * inserts to the sessions collection. + */ + void refreshNow(Client* client); + + /** + * Returns the current time. + */ + Date_t now(); + private: /** * Internal methods to handle scheduling and perform refreshes for active * session records contained within the cache. */ - void _refresh(); + void _refresh(Client* client); /** * Returns true if a record has passed its given expiration. diff --git a/src/mongo/db/logical_session_cache_factory_mongod.cpp b/src/mongo/db/logical_session_cache_factory_mongod.cpp index 6f253ca5116..883e9a3da62 100644 --- a/src/mongo/db/logical_session_cache_factory_mongod.cpp +++ b/src/mongo/db/logical_session_cache_factory_mongod.cpp @@ -34,6 +34,7 @@ #include "mongo/db/service_liason_mongod.h" #include "mongo/db/sessions_collection_mock.h" +#include "mongo/db/sessions_collection_standalone.h" #include "mongo/stdx/memory.h" namespace mongo { @@ -51,9 +52,7 @@ std::unique_ptr<SessionsCollection> makeSessionsCollection(LogicalSessionCacheSe return stdx::make_unique<MockSessionsCollection>( std::make_shared<MockSessionsCollectionImpl>()); case LogicalSessionCacheServer::kStandalone: - // TODO SERVER-29201, replace with SessionsCollectionStandalone - return stdx::make_unique<MockSessionsCollection>( - std::make_shared<MockSessionsCollectionImpl>()); + return stdx::make_unique<SessionsCollectionStandalone>(); default: MONGO_UNREACHABLE; } diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp index 381801cecab..436f0af2267 100644 --- a/src/mongo/db/logical_session_cache_test.cpp +++ b/src/mongo/db/logical_session_cache_test.cpp @@ -33,6 +33,8 @@ #include "mongo/db/logical_session_cache.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/service_context_noop.h" #include "mongo/db/service_liason_mock.h" #include "mongo/db/sessions_collection_mock.h" #include "mongo/stdx/future.h" @@ -42,8 +44,7 @@ namespace mongo { namespace { -const Milliseconds kSessionTimeout = - duration_cast<Milliseconds>(LogicalSessionCache::kLogicalSessionDefaultTimeout); +const Milliseconds kSessionTimeout = duration_cast<Milliseconds>(kLogicalSessionDefaultTimeout); const Milliseconds kForceRefresh = duration_cast<Milliseconds>(LogicalSessionCache::kLogicalSessionDefaultRefresh); @@ -60,6 +61,11 @@ public: _sessions(std::make_shared<MockSessionsCollectionImpl>()) {} void setUp() override { + auto client = serviceContext.makeClient("testClient"); + _opCtx = client->makeOperationContext(); + _client = client.get(); + Client::setCurrent(std::move(client)); + auto mockService = stdx::make_unique<MockServiceLiason>(_service); auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions); _cache = @@ -67,7 +73,12 @@ public: } void tearDown() override { + if (_opCtx) { + _opCtx.reset(); + } + _service->join(); + auto client = Client::releaseCurrent(); } void waitUntilRefreshScheduled() { @@ -88,11 +99,32 @@ public: return _sessions; } + void setOpCtx() { + _opCtx = client()->makeOperationContext(); + } + + void clearOpCtx() { + _opCtx.reset(); + } + + OperationContext* opCtx() { + return _opCtx.get(); + } + + Client* client() { + return _client; + } + private: + ServiceContextNoop serviceContext; + ServiceContext::UniqueOperationContext _opCtx; + std::shared_ptr<MockServiceLiasonImpl> _service; std::shared_ptr<MockSessionsCollectionImpl> _sessions; std::unique_ptr<LogicalSessionCache> _cache; + + Client* _client; }; // Test that session cache fetches new records from the sessions collection @@ -100,12 +132,12 @@ TEST_F(LogicalSessionCacheTest, CacheFetchesNewRecords) { auto lsid = makeLogicalSessionIdForTest(); // When the record is not present (and not in the sessions collection) returns an error - auto res = cache()->fetchAndPromote(lsid); + auto res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(!res.isOK()); // When the record is not present (but is in the sessions collection) returns it sessions()->add(makeLogicalSessionRecord(lsid, service()->now())); - res = cache()->fetchAndPromote(lsid); + res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); // When the record is present in the cache, returns it @@ -115,7 +147,7 @@ TEST_F(LogicalSessionCacheTest, CacheFetchesNewRecords) { return {ErrorCodes::NoSuchSession, "no such session"}; }); - res = cache()->fetchAndPromote(lsid); + res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); } @@ -133,7 +165,7 @@ TEST_F(LogicalSessionCacheTest, TestCacheHitsOnly) { ASSERT(!res.isOK()); // When the record is present, returns the owner - cache()->fetchAndPromote(lsid).transitional_ignore(); + cache()->fetchAndPromote(opCtx(), lsid).transitional_ignore(); res = cache()->promote(lsid); ASSERT(res.isOK()); } @@ -150,17 +182,17 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) { // Fast forward time and fetch service()->fastForward(Milliseconds(500)); ASSERT(start != service()->now()); - auto res = cache()->fetchAndPromote(lsid); + auto res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); // Now that we fetched, lifetime of session should be extended service()->fastForward(kSessionTimeout - Milliseconds(500)); - res = cache()->fetchAndPromote(lsid); + res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); // We fetched again, so lifetime extended again service()->fastForward(kSessionTimeout - Milliseconds(10)); - res = cache()->fetchAndPromote(lsid); + res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); // Fast forward and hit-only fetch @@ -181,56 +213,63 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) { // Test the startSession method TEST_F(LogicalSessionCacheTest, StartSession) { - auto lsid = makeLogicalSessionIdForTest(); + auto record = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now()); + auto lsid = record.getId(); // Test starting a new session - auto res = cache()->startSession(lsid); + auto res = cache()->startSession(opCtx(), record); ASSERT(res.isOK()); + + // Record will not be in the collection yet; refresh must happen first. + ASSERT(!sessions()->has(lsid)); + + // Do refresh, cached records should get flushed to collection. + clearOpCtx(); + cache()->refreshNow(client()); ASSERT(sessions()->has(lsid)); - // Try to start a session that is already in the sessions collection and our - // local cache, should fail - res = cache()->startSession(lsid); - ASSERT(!res.isOK()); + // Try to start the same session again, should succeed. + res = cache()->startSession(opCtx(), record); + ASSERT(res.isOK()); // Try to start a session that is already in the sessions collection but - // is not in our local cache, should fail + // is not in our local cache, should succeed. auto record2 = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now()); - auto lsid2 = record2.getId(); - sessions()->add(std::move(record2)); - res = cache()->startSession(lsid2); - ASSERT(!res.isOK()); + sessions()->add(record2); + res = cache()->startSession(opCtx(), record2); + ASSERT(res.isOK()); // Try to start a session that has expired from our cache, and is no // longer in the sessions collection, should succeed service()->fastForward(Milliseconds(kSessionTimeout.count() + 5)); sessions()->remove(lsid); ASSERT(!sessions()->has(lsid)); - res = cache()->startSession(lsid); + res = cache()->startSession(opCtx(), record); ASSERT(res.isOK()); - ASSERT(sessions()->has(lsid)); } // Test that records in the cache are properly refreshed until they expire TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) { // Insert two records into the cache - auto lsid1 = makeLogicalSessionIdForTest(); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid1).transitional_ignore(); - cache()->startSession(lsid2).transitional_ignore(); + auto record1 = makeLogicalSessionRecordForTest(); + auto record2 = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record1).transitional_ignore(); + cache()->startSession(opCtx(), record2).transitional_ignore(); stdx::promise<int> hitRefresh; auto refreshFuture = hitRefresh.get_future(); // Advance time to first refresh point, check that refresh happens, and // that it includes both our records - sessions()->setRefreshHook([&hitRefresh](LogicalSessionIdSet sessions) { + sessions()->setRefreshHook([&hitRefresh](const LogicalSessionRecordSet& sessions) { hitRefresh.set_value(sessions.size()); - return LogicalSessionIdSet{}; + return Status::OK(); }); // Wait for the refresh to happen + clearOpCtx(); service()->fastForward(kForceRefresh); + cache()->refreshNow(client()); refreshFuture.wait(); ASSERT_EQ(refreshFuture.get(), 2); @@ -240,98 +279,39 @@ TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) { auto refresh2Future = refresh2.get_future(); // Use one of the records - auto res = cache()->fetchAndPromote(lsid1); + setOpCtx(); + auto res = cache()->fetchAndPromote(opCtx(), record1.getId()); ASSERT(res.isOK()); // Advance time so that one record expires // Ensure that first record was refreshed, and second was thrown away - sessions()->setRefreshHook([&refresh2](LogicalSessionIdSet sessions) { + sessions()->setRefreshHook([&refresh2](const LogicalSessionRecordSet& sessions) { // We should only have one record here, the other should have expired ASSERT_EQ(sessions.size(), size_t(1)); - refresh2.set_value(*(sessions.begin())); - return LogicalSessionIdSet{}; + refresh2.set_value(sessions.begin()->getId()); + return Status::OK(); }); - // Wait until the second job has been scheduled - waitUntilRefreshScheduled(); - + clearOpCtx(); service()->fastForward(kSessionTimeout - kForceRefresh + Milliseconds(1)); + cache()->refreshNow(client()); refresh2Future.wait(); - ASSERT_EQ(refresh2Future.get(), lsid1); -} - -// Test that cache deletes records that fail to refresh -TEST_F(LogicalSessionCacheTest, CacheDeletesRecordsThatFailToRefresh) { - // Put two sessions into the cache - auto lsid1 = makeLogicalSessionIdForTest(); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid1).transitional_ignore(); - cache()->startSession(lsid2).transitional_ignore(); - - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); - - // Record 1 fails to refresh - sessions()->setRefreshHook([&hitRefresh, &lsid1](LogicalSessionIdSet sessions) { - ASSERT_EQ(sessions.size(), size_t(2)); - hitRefresh.set_value(); - return LogicalSessionIdSet{lsid1}; - }); - - // Force a refresh - service()->fastForward(kForceRefresh); - refreshFuture.wait(); - - // Ensure that one record is still there and the other is gone - auto res = cache()->promote(lsid1); - ASSERT(!res.isOK()); - res = cache()->promote(lsid2); - ASSERT(res.isOK()); -} - -// Test that we don't remove records that fail to refresh if they are active on the service -TEST_F(LogicalSessionCacheTest, KeepActiveSessionAliveEvenIfRefreshFails) { - // Put two sessions into the cache, one into the service - auto lsid1 = makeLogicalSessionIdForTest(); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid1).transitional_ignore(); - service()->add(lsid1); - cache()->startSession(lsid2).transitional_ignore(); - - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); - - // SignedLsid 1 fails to refresh - sessions()->setRefreshHook([&hitRefresh, &lsid1](LogicalSessionIdSet sessions) { - ASSERT_EQ(sessions.size(), size_t(2)); - hitRefresh.set_value(); - return LogicalSessionIdSet{lsid1}; - }); - - // Force a refresh - service()->fastForward(kForceRefresh); - refreshFuture.wait(); - - // Ensure that both lsids are still there - auto res = cache()->promote(lsid1); - ASSERT(res.isOK()); - res = cache()->promote(lsid2); - ASSERT(res.isOK()); + ASSERT_EQ(refresh2Future.get(), record1.getId()); } // Test that session cache properly expires lsids after 30 minutes of no use TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) { // Insert a lsid - auto lsid = makeLogicalSessionIdForTest(); - cache()->startSession(lsid).transitional_ignore(); - auto res = cache()->promote(lsid); + auto record = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record).transitional_ignore(); + auto res = cache()->promote(record.getId()); ASSERT(res.isOK()); // Force it to expire service()->fastForward(Milliseconds(kSessionTimeout.count() + 5)); // Check that it is no longer in the cache - res = cache()->promote(lsid); + res = cache()->promote(record.getId()); ASSERT(!res.isOK()); } @@ -342,47 +322,31 @@ TEST_F(LogicalSessionCacheTest, LongRunningQueriesAreRefreshed) { // Insert one active lsid on the service, none in the cache service()->add(lsid); - stdx::mutex mutex; - stdx::condition_variable cv; int count = 0; - sessions()->setRefreshHook([&cv, &mutex, &count, &lsid](LogicalSessionIdSet sessions) { - ASSERT_EQ(*(sessions.begin()), lsid); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - count++; - } - cv.notify_all(); - - return LogicalSessionIdSet{}; + sessions()->setRefreshHook([&count, &lsid](const LogicalSessionRecordSet& sessions) { + ASSERT_EQ(sessions.size(), size_t(1)); + ASSERT_EQ(sessions.begin()->getId(), lsid); + count++; + return Status::OK(); }); + clearOpCtx(); + // Force a refresh, it should refresh our active session service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count] { return count == 1; }); - } - - // Wait until the next job has been scheduled - waitUntilRefreshScheduled(); + cache()->refreshNow(client()); + ASSERT_EQ(count, 1); // Force a session timeout, session is still on the service service()->fastForward(kSessionTimeout); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count] { return count == 2; }); - } - - // Wait until the next job has been scheduled - waitUntilRefreshScheduled(); + cache()->refreshNow(client()); + ASSERT_EQ(count, 2); // Force another refresh, check that it refreshes that active lsid again service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count] { return count == 3; }); - } + cache()->refreshNow(client()); + ASSERT_EQ(count, 3); } // Test that the set of lsids we refresh is a sum of cached + active lsids @@ -390,45 +354,39 @@ TEST_F(LogicalSessionCacheTest, RefreshCachedAndServiceSignedLsidsTogether) { // Put one session into the cache, one into the service auto lsid1 = makeLogicalSessionIdForTest(); service()->add(lsid1); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid2).transitional_ignore(); - - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); + auto record2 = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record2).transitional_ignore(); - // Both lsids refresh - sessions()->setRefreshHook([&hitRefresh](LogicalSessionIdSet sessions) { + // Both signedLsids refresh + sessions()->setRefreshHook([](const LogicalSessionRecordSet& sessions) { ASSERT_EQ(sessions.size(), size_t(2)); - hitRefresh.set_value(); - return LogicalSessionIdSet{}; + return Status::OK(); }); // Force a refresh + clearOpCtx(); service()->fastForward(kForceRefresh); - refreshFuture.wait(); + cache()->refreshNow(client()); } // Test large sets of cache-only session lsids TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) { int count = LogicalSessionCache::kLogicalSessionCacheDefaultCapacity; for (int i = 0; i < count; i++) { - auto lsid = makeLogicalSessionIdForTest(); - cache()->startSession(lsid).transitional_ignore(); + auto record = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record).transitional_ignore(); } - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); - - // Check that all lsids refresh - sessions()->setRefreshHook([&hitRefresh, &count](LogicalSessionIdSet sessions) { + // Check that all signedLsids refresh + sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) { ASSERT_EQ(sessions.size(), size_t(count)); - hitRefresh.set_value(); - return LogicalSessionIdSet{}; + return Status::OK(); }); // Force a refresh + clearOpCtx(); service()->fastForward(kForceRefresh); - refreshFuture.wait(); + cache()->refreshNow(client()); } // Test larger sets of service-only session lsids @@ -439,19 +397,16 @@ TEST_F(LogicalSessionCacheTest, ManyLongRunningSessionsRefresh) { service()->add(lsid); } - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); - - // Check that all lsids refresh - sessions()->setRefreshHook([&hitRefresh, &count](LogicalSessionIdSet sessions) { + // Check that all signedLsids refresh + sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) { ASSERT_EQ(sessions.size(), size_t(count)); - hitRefresh.set_value(); - return LogicalSessionIdSet{}; + return Status::OK(); }); // Force a refresh + clearOpCtx(); service()->fastForward(kForceRefresh); - refreshFuture.wait(); + cache()->refreshNow(client()); } // Test larger mixed sets of cache/service active sessions @@ -461,77 +416,44 @@ TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) { auto lsid = makeLogicalSessionIdForTest(); service()->add(lsid); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid2).transitional_ignore(); + auto record2 = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record2).transitional_ignore(); } - stdx::mutex mutex; - stdx::condition_variable cv; - int refreshes = 0; int nRefreshed = 0; // Check that all lsids refresh successfully - sessions()->setRefreshHook( - [&refreshes, &mutex, &cv, &nRefreshed](LogicalSessionIdSet sessions) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - refreshes++; - nRefreshed = sessions.size(); - } - cv.notify_all(); - - return LogicalSessionIdSet{}; - }); + sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) { + nRefreshed = sessions.size(); + return Status::OK(); + }); // Force a refresh + clearOpCtx(); service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&refreshes] { return refreshes == 1; }); - } + cache()->refreshNow(client()); ASSERT_EQ(nRefreshed, count * 2); // Remove all of the service sessions, should just refresh the cache entries - // (and make all but one fail to refresh) service()->clear(); - sessions()->setRefreshHook( - [&refreshes, &mutex, &cv, &nRefreshed](LogicalSessionIdSet sessions) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - refreshes++; - nRefreshed = sessions.size(); - } - cv.notify_all(); - - sessions.erase(sessions.begin()); - return sessions; - }); - - // Wait for job to be scheduled - waitUntilRefreshScheduled(); + sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) { + nRefreshed = sessions.size(); + return Status::OK(); + }); // Force another refresh service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&refreshes] { return refreshes == 2; }); - } + cache()->refreshNow(client()); // We should not have refreshed any sessions from the service, only the cache ASSERT_EQ(nRefreshed, count); - // Wait for job to be scheduled - waitUntilRefreshScheduled(); - // Force a third refresh service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&refreshes] { return refreshes == 3; }); - } + cache()->refreshNow(client()); - // Since all but one lsid failed to refresh, third set should just have one lsid - ASSERT_EQ(nRefreshed, 1); + // Again, we should have only refreshed sessions from the cache + ASSERT_EQ(nRefreshed, count); } } // namespace diff --git a/src/mongo/db/logical_session_id.cpp b/src/mongo/db/logical_session_id.cpp index ad1dd8a496d..3f59f19f766 100644 --- a/src/mongo/db/logical_session_id.cpp +++ b/src/mongo/db/logical_session_id.cpp @@ -1,41 +1,42 @@ /** - * Copyright (C) 2017 MongoDB Inc. + * Copyright (C) 2017 MongoDB Inc. * - * This program is free software: you can redistribute it and/or modify - * it under the terms of the GNU Affero General Public License, version 3, - * as published by the Free Software Foundation. + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. * - * This program is distributed in the hope that it will be useful, - * but WITHOUT ANY WARRANTY; without even the implied warranty of - * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the - * GNU Affero General Public License for more details. + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see <http://www.gnu.org/licenses/>. + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. * - * As a special exception, the copyright holders give permission to link the - * code of portions of this program with the OpenSSL library under certain - * conditions as described in each individual source file and distribute - * linked combinations including the program with the OpenSSL library. You - * must comply with the GNU Affero General Public License in all respects for - * all of the code used other than as permitted herein. If you modify file(s) - * with this exception, you may extend this exception to your version of the - * file(s), but you are not obligated to do so. If you do not wish to do so, - * delete this exception statement from your version. If you delete this - * exception statement from all source files in the program, then also delete - * it in the license file. + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/logical_session_id.h" - -#include "mongo/bson/bsonobjbuilder.h" -#include "mongo/db/logical_session_cache.h" -#include "mongo/util/assert_util.h" +#include "mongo/db/server_parameters.h" namespace mongo { +MONGO_EXPORT_STARTUP_SERVER_PARAMETER(localLogicalSessionTimeoutMinutes, + int, + kLogicalSessionDefaultTimeout.count()); + LogicalSessionId makeLogicalSessionIdForTest() { LogicalSessionId lsid; @@ -45,4 +46,12 @@ LogicalSessionId makeLogicalSessionIdForTest() { return lsid; } +LogicalSessionRecord makeLogicalSessionRecordForTest() { + LogicalSessionRecord record{}; + + record.setId(makeLogicalSessionIdForTest()); + + return record; +} + } // namespace mongo diff --git a/src/mongo/db/logical_session_id.h b/src/mongo/db/logical_session_id.h index 4984a57cc30..37937d79a56 100644 --- a/src/mongo/db/logical_session_id.h +++ b/src/mongo/db/logical_session_id.h @@ -44,9 +44,11 @@ const StmtId kUninitializedStmtId = -1; const TxnNumber kUninitializedTxnNumber = -1; class BSONObjBuilder; - class OperationContext; +const Minutes kLogicalSessionDefaultTimeout = Minutes(30); +extern int localLogicalSessionTimeoutMinutes; + inline bool operator==(const LogicalSessionId& lhs, const LogicalSessionId& rhs) { auto makeEqualityLens = [](const auto& lsid) { return std::tie(lsid.getId(), lsid.getUid()); }; @@ -67,6 +69,8 @@ inline bool operator!=(const LogicalSessionRecord& lhs, const LogicalSessionReco LogicalSessionId makeLogicalSessionIdForTest(); +LogicalSessionRecord makeLogicalSessionRecordForTest(); + struct LogicalSessionIdHash { std::size_t operator()(const LogicalSessionId& lsid) const { return _hasher(lsid.getId()); @@ -106,5 +110,6 @@ inline StringBuilder& operator<<(StringBuilder& s, const LogicalSessionFromClien * An alias for sets of session ids. */ using LogicalSessionIdSet = stdx::unordered_set<LogicalSessionId, LogicalSessionIdHash>; +using LogicalSessionRecordSet = stdx::unordered_set<LogicalSessionRecord, LogicalSessionRecordHash>; } // namespace mongo diff --git a/src/mongo/db/logical_session_id.idl b/src/mongo/db/logical_session_id.idl index ccedd28e6f6..c540c3c2174 100644 --- a/src/mongo/db/logical_session_id.idl +++ b/src/mongo/db/logical_session_id.idl @@ -64,12 +64,27 @@ structs: id: LogicalSessionIdToClient timeoutMinutes: int + UserNameWithId: + description: "A struct representing the combination of a UserName and an id" + strict: true + fields: + name: + type: string + id: + type: objectid + optional: true + LogicalSessionRecord: description: "A struct representing a LogicalSessionRecord" strict: true fields: - id: LogicalSessionId + _id: + type: LogicalSessionId + cpp_name: id lastUse: date + user: + type: UserNameWithId + optional: true LogicalSessionFromClient: description: "A struct representing a LogicalSessionId from external clients" diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp index fbed0134c83..ae68e5037bf 100644 --- a/src/mongo/db/logical_session_id_helpers.cpp +++ b/src/mongo/db/logical_session_id_helpers.cpp @@ -52,22 +52,7 @@ SHA256Block lookupUserDigest(OperationContext* opCtx) { if (AuthorizationManager::get(serviceContext)->isAuthEnabled()) { UserName userName; - auto authzSession = AuthorizationSession::get(client); - auto userNameItr = authzSession->getAuthenticatedUserNames(); - if (userNameItr.more()) { - userName = userNameItr.next(); - if (userNameItr.more()) { - uasserted(ErrorCodes::Unauthorized, - "must only be authenticated as exactly one user " - "to create a logical session"); - } - } else { - uasserted(ErrorCodes::Unauthorized, - "must only be authenticated as exactly one user " - "to create a logical session"); - } - - User* user = authzSession->lookupUser(userName); + auto user = AuthorizationSession::get(client)->getSingleUser(); invariant(user); return user->getDigest(); @@ -109,8 +94,36 @@ LogicalSessionId makeLogicalSessionId(OperationContext* opCtx) { return id; } +LogicalSessionRecord makeLogicalSessionRecord(OperationContext* opCtx, Date_t lastUse) { + LogicalSessionId id{}; + LogicalSessionRecord lsr{}; + + auto client = opCtx->getClient(); + ServiceContext* serviceContext = client->getServiceContext(); + if (AuthorizationManager::get(serviceContext)->isAuthEnabled()) { + auto user = AuthorizationSession::get(client)->getSingleUser(); + invariant(user); + + id.setUid(user->getDigest()); + + UserNameWithId userDoc{}; + userDoc.setName(StringData(user->getName().toString())); + userDoc.setId(user->getID()); + lsr.setUser(userDoc); + } else { + id.setUid(kNoAuthDigest); + } + + id.setId(UUID::gen()); + + lsr.setId(id); + lsr.setLastUse(lastUse); + + return lsr; +} + LogicalSessionRecord makeLogicalSessionRecord(const LogicalSessionId& lsid, Date_t lastUse) { - LogicalSessionRecord lsr; + LogicalSessionRecord lsr{}; lsr.setId(lsid); lsr.setLastUse(lastUse); @@ -118,11 +131,35 @@ LogicalSessionRecord makeLogicalSessionRecord(const LogicalSessionId& lsid, Date return lsr; } +LogicalSessionRecord makeLogicalSessionRecord(OperationContext* opCtx, + const LogicalSessionId& lsid, + Date_t lastUse) { + auto lsr = makeLogicalSessionRecord(lsid, lastUse); + + auto client = opCtx->getClient(); + ServiceContext* serviceContext = client->getServiceContext(); + if (AuthorizationManager::get(serviceContext)->isAuthEnabled()) { + auto user = AuthorizationSession::get(client)->getSingleUser(); + invariant(user); + + if (user->getDigest() == lsid.getUid()) { + UserNameWithId userDoc{}; + userDoc.setName(StringData(user->getName().toString())); + userDoc.setId(user->getID()); + lsr.setUser(userDoc); + } + } + + return lsr; +} + + LogicalSessionToClient makeLogicalSessionToClient(const LogicalSessionId& lsid) { LogicalSessionIdToClient lsitc; lsitc.setId(lsid.getId()); LogicalSessionToClient id; + id.setId(lsitc); id.setTimeoutMinutes(localLogicalSessionTimeoutMinutes); diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h index 5f1ff3c854e..e7cf637d984 100644 --- a/src/mongo/db/logical_session_id_helpers.h +++ b/src/mongo/db/logical_session_id_helpers.h @@ -32,10 +32,24 @@ namespace mongo { +/** + * Factory functions to generate logical session records. + */ LogicalSessionId makeLogicalSessionId(const LogicalSessionFromClient& lsid, OperationContext* opCtx); LogicalSessionId makeLogicalSessionId(OperationContext* opCtx); + +/** + * Factory functions to make logical session records. The overloads that + * take an OperationContext should be used when possible, as they will also set the + * user information on the record. + */ LogicalSessionRecord makeLogicalSessionRecord(const LogicalSessionId& lsid, Date_t lastUse); +LogicalSessionRecord makeLogicalSessionRecord(OperationContext* opCtx, Date_t lastUse); +LogicalSessionRecord makeLogicalSessionRecord(OperationContext* opCtx, + const LogicalSessionId& lsid, + Date_t lastUse); + LogicalSessionToClient makeLogicalSessionToClient(const LogicalSessionId& lsid); /** diff --git a/src/mongo/db/namespace_string.cpp b/src/mongo/db/namespace_string.cpp index 44fbc2fe003..35fbde965dd 100644 --- a/src/mongo/db/namespace_string.cpp +++ b/src/mongo/db/namespace_string.cpp @@ -115,6 +115,8 @@ bool NamespaceString::isLegalClientSystemNS() const { return true; if (ns() == "admin.system.backup_users") return true; + if (ns() == "admin.system.sessions") + return true; } if (ns() == "local.system.replset") return true; diff --git a/src/mongo/db/s/migration_util.cpp b/src/mongo/db/s/migration_util.cpp index 0eccd40f3db..234aae0565d 100644 --- a/src/mongo/db/s/migration_util.cpp +++ b/src/mongo/db/s/migration_util.cpp @@ -62,4 +62,5 @@ BSONObj makeMigrationStatusDocument(const NamespaceString& nss, } } // namespace migrationutil + } // namespace mongo diff --git a/src/mongo/db/service_liason_mock.cpp b/src/mongo/db/service_liason_mock.cpp index 4c2ba699acb..821de302e02 100644 --- a/src/mongo/db/service_liason_mock.cpp +++ b/src/mongo/db/service_liason_mock.cpp @@ -56,7 +56,8 @@ Date_t MockServiceLiasonImpl::now() const { } void MockServiceLiasonImpl::scheduleJob(PeriodicRunner::PeriodicJob job) { - _runner->scheduleJob(std::move(job)); + // The cache should be refreshed from tests by calling refreshNow(). + return; } void MockServiceLiasonImpl::add(LogicalSessionId lsid) { diff --git a/src/mongo/db/sessions_collection.cpp b/src/mongo/db/sessions_collection.cpp index 1838d0cfe0e..b5c66195afb 100644 --- a/src/mongo/db/sessions_collection.cpp +++ b/src/mongo/db/sessions_collection.cpp @@ -30,8 +30,125 @@ #include "mongo/db/sessions_collection.h" +#include <memory> + +#include "mongo/bson/bsonobjbuilder.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/stdx/functional.h" +#include "mongo/stdx/memory.h" + namespace mongo { +namespace { + +BSONObj lsidQuery(const LogicalSessionId& lsid) { + return BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()); +} + +BSONObj lsidQuery(const LogicalSessionRecord& record) { + return lsidQuery(record.getId()); +} + +BSONObj updateQuery(const LogicalSessionRecord& record, Date_t refreshTime) { + // { $max : { lastUse : <time> }, $setOnInsert : { user : <user> } } + + // Build our update doc. + BSONObjBuilder updateBuilder; + + { + BSONObjBuilder maxBuilder(updateBuilder.subobjStart("$max")); + maxBuilder.append(LogicalSessionRecord::kLastUseFieldName, refreshTime); + } + + if (record.getUser()) { + BSONObjBuilder setBuilder(updateBuilder.subobjStart("$setOnInsert")); + setBuilder.append(LogicalSessionRecord::kUserFieldName, record.getUser()->toBSON()); + } + + return updateBuilder.obj(); +} + +template <typename InitBatchFn, typename AddLineFn, typename SendBatchFn, typename Container> +Status runBulkCmd(StringData label, + InitBatchFn&& initBatch, + AddLineFn&& addLine, + SendBatchFn&& sendBatch, + const Container& items) { + int i = 0; + BufBuilder buf; + + boost::optional<BSONObjBuilder> batchBuilder; + boost::optional<BSONArrayBuilder> entries; + + auto setupBatchBuilder = [&] { + buf.reset(); + batchBuilder.emplace(buf); + initBatch(&(batchBuilder.get())); + entries.emplace(batchBuilder->subarrayStart(label)); + }; + + auto sendLocalBatch = [&] { + entries->done(); + return sendBatch(batchBuilder->done()); + }; + + setupBatchBuilder(); + + for (const auto& item : items) { + addLine(&(entries.get()), item); + + if (++i >= 1000) { + auto res = sendLocalBatch(); + if (!res.isOK()) { + return res; + } + + setupBatchBuilder(); + i = 0; + } + } + + return sendLocalBatch(); +} + +} // namespace + + +constexpr StringData SessionsCollection::kSessionsDb; +constexpr StringData SessionsCollection::kSessionsCollection; +constexpr StringData SessionsCollection::kSessionsFullNS; + + SessionsCollection::~SessionsCollection() = default; +Status SessionsCollection::doRefresh(const LogicalSessionRecordSet& sessions, + Date_t refreshTime, + SendBatchFn send) { + auto init = [](BSONObjBuilder* batch) { + batch->append("update", kSessionsCollection); + batch->append("ordered", false); + }; + + auto add = [&refreshTime](BSONArrayBuilder* entries, const LogicalSessionRecord& record) { + entries->append(BSON("q" << lsidQuery(record) << "u" << updateQuery(record, refreshTime) + << "upsert" + << true)); + }; + + return runBulkCmd("updates", init, add, send, sessions); +} + +Status SessionsCollection::doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send) { + auto init = [](BSONObjBuilder* batch) { + batch->append("delete", kSessionsCollection); + batch->append("ordered", false); + }; + + auto add = [](BSONArrayBuilder* builder, const LogicalSessionId& lsid) { + builder->append(BSON("q" << lsidQuery(lsid) << "limit" << 0)); + }; + + return runBulkCmd("deletes", init, add, send, sessions); +} + } // namespace mongo diff --git a/src/mongo/db/sessions_collection.h b/src/mongo/db/sessions_collection.h index 4a7503a293e..e1cda80934f 100644 --- a/src/mongo/db/sessions_collection.h +++ b/src/mongo/db/sessions_collection.h @@ -29,9 +29,14 @@ #pragma once #include "mongo/db/logical_session_id.h" +#include "mongo/stdx/functional.h" namespace mongo { +class BSONArrayBuilder; +class BSONObjBuilder; +class OperationContext; + /** * An abstract interface describing the entrypoint into the sessions collection. * @@ -42,37 +47,50 @@ class SessionsCollection { public: virtual ~SessionsCollection(); + static constexpr StringData kSessionsDb = "admin"_sd; + static constexpr StringData kSessionsCollection = "system.sessions"_sd; + static constexpr StringData kSessionsFullNS = "admin.system.sessions"_sd; + /** * Returns a LogicalSessionRecord for the given session id. This method * may run networking operations on the calling thread. */ - virtual StatusWith<LogicalSessionRecord> fetchRecord(LogicalSessionId id) = 0; - - /** - * Inserts the given record into the sessions collection. This method may run - * networking operations on the calling thread. - * - * Returns a DuplicateSession error if the session already exists in the - * sessions collection. - */ - virtual Status insertRecord(LogicalSessionRecord record) = 0; + virtual StatusWith<LogicalSessionRecord> fetchRecord(OperationContext* opCtx, + const LogicalSessionId& id) = 0; /** * Updates the last-use times on the given sessions to be greater than - * or equal to the current time. + * or equal to the given time. * * Returns a list of sessions for which no authoritative record was found, - * and hence were not refreshed. + * and hence were not refreshed. Returns an error if a networking issue occurred. */ - virtual LogicalSessionIdSet refreshSessions(LogicalSessionIdSet sessions) = 0; + virtual Status refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions, + Date_t refreshTime) = 0; /** * Removes the authoritative records for the specified sessions. * * Implementations should perform authentication checks to ensure that * session records may only be removed if their owner is logged in. + * + * Returns an error if the removal fails, for example from a network error. + */ + virtual Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) = 0; + +protected: + using SendBatchFn = stdx::function<Status(BSONObj batch)>; + + /** + * Formats and sends batches of refreshes for the given set of sessions. + */ + Status doRefresh(const LogicalSessionRecordSet& sessions, Date_t refreshTime, SendBatchFn send); + + /** + * Formats and sends batches of deletes for the given set of sessions. */ - virtual void removeRecords(LogicalSessionIdSet sessions) = 0; + Status doRemove(const LogicalSessionIdSet& sessions, SendBatchFn send); }; } // namespace mongo diff --git a/src/mongo/db/sessions_collection_mock.cpp b/src/mongo/db/sessions_collection_mock.cpp index 832213e1223..09deaaa3b1f 100644 --- a/src/mongo/db/sessions_collection_mock.cpp +++ b/src/mongo/db/sessions_collection_mock.cpp @@ -35,7 +35,6 @@ namespace mongo { MockSessionsCollectionImpl::MockSessionsCollectionImpl() : _sessions(), _fetch(stdx::bind(&MockSessionsCollectionImpl::_fetchRecord, this, stdx::placeholders::_1)), - _insert(stdx::bind(&MockSessionsCollectionImpl::_insertRecord, this, stdx::placeholders::_1)), _refresh( stdx::bind(&MockSessionsCollectionImpl::_refreshSessions, this, stdx::placeholders::_1)), _remove( @@ -45,10 +44,6 @@ void MockSessionsCollectionImpl::setFetchHook(FetchHook hook) { _fetch = std::move(hook); } -void MockSessionsCollectionImpl::setInsertHook(InsertHook hook) { - _insert = std::move(hook); -} - void MockSessionsCollectionImpl::setRefreshHook(RefreshHook hook) { _refresh = std::move(hook); } @@ -59,26 +54,22 @@ void MockSessionsCollectionImpl::setRemoveHook(RemoveHook hook) { void MockSessionsCollectionImpl::clearHooks() { _fetch = stdx::bind(&MockSessionsCollectionImpl::_fetchRecord, this, stdx::placeholders::_1); - _insert = stdx::bind(&MockSessionsCollectionImpl::_insertRecord, this, stdx::placeholders::_1); _refresh = stdx::bind(&MockSessionsCollectionImpl::_refreshSessions, this, stdx::placeholders::_1); _remove = stdx::bind(&MockSessionsCollectionImpl::_removeRecords, this, stdx::placeholders::_1); } -StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::fetchRecord(LogicalSessionId id) { - return _fetch(std::move(id)); -} - -Status MockSessionsCollectionImpl::insertRecord(LogicalSessionRecord record) { - return _insert(std::move(record)); +StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::fetchRecord( + const LogicalSessionId& id) { + return _fetch(id); } -LogicalSessionIdSet MockSessionsCollectionImpl::refreshSessions(LogicalSessionIdSet sessions) { - return _refresh(std::move(sessions)); +Status MockSessionsCollectionImpl::refreshSessions(const LogicalSessionRecordSet& sessions) { + return _refresh(sessions); } -void MockSessionsCollectionImpl::removeRecords(LogicalSessionIdSet sessions) { - _remove(std::move(sessions)); +Status MockSessionsCollectionImpl::removeRecords(const LogicalSessionIdSet& sessions) { + return _remove(std::move(sessions)); } void MockSessionsCollectionImpl::add(LogicalSessionRecord record) { @@ -105,7 +96,8 @@ const MockSessionsCollectionImpl::SessionMap& MockSessionsCollectionImpl::sessio return _sessions; } -StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::_fetchRecord(LogicalSessionId id) { +StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::_fetchRecord( + const LogicalSessionId& id) { stdx::unique_lock<stdx::mutex> lk(_mutex); // If we do not have this record, return an error @@ -117,40 +109,22 @@ StatusWith<LogicalSessionRecord> MockSessionsCollectionImpl::_fetchRecord(Logica return it->second; } -Status MockSessionsCollectionImpl::_insertRecord(LogicalSessionRecord record) { - stdx::unique_lock<stdx::mutex> lk(_mutex); - auto res = _sessions.insert({record.getId(), std::move(record)}); - - // We should never try to insert the same record twice. In theory this could - // happen because of a UUID conflict. - if (!res.second) { - return {ErrorCodes::DuplicateSession, "Session already exists in the sessions collection"}; - } - - return Status::OK(); -} - -LogicalSessionIdSet MockSessionsCollectionImpl::_refreshSessions(LogicalSessionIdSet sessions) { - LogicalSessionIdSet notFound{}; - - { - stdx::unique_lock<stdx::mutex> lk(_mutex); - for (auto& lsid : sessions) { - auto it = _sessions.find(lsid); - if (it == _sessions.end()) { - notFound.insert(lsid); - } +Status MockSessionsCollectionImpl::_refreshSessions(const LogicalSessionRecordSet& sessions) { + for (auto& record : sessions) { + if (!has(record.getId())) { + _sessions.insert({record.getId(), record}); } } - - return notFound; + return Status::OK(); } -void MockSessionsCollectionImpl::_removeRecords(LogicalSessionIdSet sessions) { +Status MockSessionsCollectionImpl::_removeRecords(const LogicalSessionIdSet& sessions) { stdx::unique_lock<stdx::mutex> lk(_mutex); for (auto& lsid : sessions) { _sessions.erase(lsid); } + + return Status::OK(); } } // namespace mongo diff --git a/src/mongo/db/sessions_collection_mock.h b/src/mongo/db/sessions_collection_mock.h index 870d0434991..5456e0731a2 100644 --- a/src/mongo/db/sessions_collection_mock.h +++ b/src/mongo/db/sessions_collection_mock.h @@ -58,14 +58,12 @@ public: MockSessionsCollectionImpl(); - using FetchHook = stdx::function<StatusWith<LogicalSessionRecord>(LogicalSessionId)>; - using InsertHook = stdx::function<Status(LogicalSessionRecord)>; - using RefreshHook = stdx::function<LogicalSessionIdSet(LogicalSessionIdSet)>; - using RemoveHook = stdx::function<void(LogicalSessionIdSet)>; + using FetchHook = stdx::function<StatusWith<LogicalSessionRecord>(const LogicalSessionId&)>; + using RefreshHook = stdx::function<Status(const LogicalSessionRecordSet&)>; + using RemoveHook = stdx::function<Status(const LogicalSessionIdSet&)>; // Set custom hooks to override default behavior void setFetchHook(FetchHook hook); - void setInsertHook(InsertHook hook); void setRefreshHook(RefreshHook hook); void setRemoveHook(RemoveHook hook); @@ -73,10 +71,9 @@ public: void clearHooks(); // Forwarding methods from the MockSessionsCollection - StatusWith<LogicalSessionRecord> fetchRecord(LogicalSessionId id); - Status insertRecord(LogicalSessionRecord record); - LogicalSessionIdSet refreshSessions(LogicalSessionIdSet sessions); - void removeRecords(LogicalSessionIdSet sessions); + StatusWith<LogicalSessionRecord> fetchRecord(const LogicalSessionId& id); + Status refreshSessions(const LogicalSessionRecordSet& sessions); + Status removeRecords(const LogicalSessionIdSet& sessions); // Test-side methods that operate on the _sessions map void add(LogicalSessionRecord record); @@ -87,16 +84,14 @@ public: private: // Default implementations, may be overridden with custom hooks. - StatusWith<LogicalSessionRecord> _fetchRecord(LogicalSessionId id); - Status _insertRecord(LogicalSessionRecord record); - LogicalSessionIdSet _refreshSessions(LogicalSessionIdSet sessions); - void _removeRecords(LogicalSessionIdSet sessions); + StatusWith<LogicalSessionRecord> _fetchRecord(const LogicalSessionId& id); + Status _refreshSessions(const LogicalSessionRecordSet& sessions); + Status _removeRecords(const LogicalSessionIdSet& sessions); stdx::mutex _mutex; SessionMap _sessions; FetchHook _fetch; - InsertHook _insert; RefreshHook _refresh; RemoveHook _remove; }; @@ -111,20 +106,19 @@ public: explicit MockSessionsCollection(std::shared_ptr<MockSessionsCollectionImpl> impl) : _impl(std::move(impl)) {} - StatusWith<LogicalSessionRecord> fetchRecord(LogicalSessionId id) override { - return _impl->fetchRecord(std::move(id)); + StatusWith<LogicalSessionRecord> fetchRecord(OperationContext* opCtx, + const LogicalSessionId& id) override { + return _impl->fetchRecord(id); } - Status insertRecord(LogicalSessionRecord record) override { - return _impl->insertRecord(std::move(record)); + Status refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions, + Date_t refreshTime) override { + return _impl->refreshSessions(sessions); } - LogicalSessionIdSet refreshSessions(LogicalSessionIdSet sessions) override { - return _impl->refreshSessions(std::move(sessions)); - } - - void removeRecords(LogicalSessionIdSet sessions) override { - return _impl->removeRecords(std::move(sessions)); + Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override { + return _impl->removeRecords(sessions); } private: diff --git a/src/mongo/db/sessions_collection_standalone.cpp b/src/mongo/db/sessions_collection_standalone.cpp new file mode 100644 index 00000000000..093b61ef959 --- /dev/null +++ b/src/mongo/db/sessions_collection_standalone.cpp @@ -0,0 +1,91 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects + * for all of the code used other than as permitted herein. If you modify + * file(s) with this exception, you may extend this exception to your + * version of the file(s), but you are not obligated to do so. If you do not + * wish to do so, delete this exception statement from your version. If you + * delete this exception statement from all source files in the program, + * then also delete it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include "mongo/db/sessions_collection_standalone.h" + +#include "mongo/client/dbclientinterface.h" +#include "mongo/client/query.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/operation_context.h" + +namespace mongo { + +namespace { + +BSONObj lsidQuery(const LogicalSessionId& lsid) { + return BSON(LogicalSessionRecord::kIdFieldName << lsid.toBSON()); +} +} // namespace + +StatusWith<LogicalSessionRecord> SessionsCollectionStandalone::fetchRecord( + OperationContext* opCtx, const LogicalSessionId& lsid) { + DBDirectClient client(opCtx); + auto cursor = client.query(kSessionsFullNS.toString(), lsidQuery(lsid), 1); + if (!cursor->more()) { + return {ErrorCodes::NoSuchSession, "No matching record in the sessions collection"}; + } + + try { + IDLParserErrorContext ctx("LogicalSessionRecord"); + return LogicalSessionRecord::parse(ctx, cursor->next()); + } catch (...) { + return exceptionToStatus(); + } +} + +Status SessionsCollectionStandalone::refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions, + Date_t refreshTime) { + DBDirectClient client(opCtx); + return doRefresh(sessions, refreshTime, makeSendFn(&client)); +} + +Status SessionsCollectionStandalone::removeRecords(OperationContext* opCtx, + const LogicalSessionIdSet& sessions) { + DBDirectClient client(opCtx); + return doRemove(sessions, makeSendFn(&client)); +} + +SessionsCollection::SendBatchFn SessionsCollectionStandalone::makeSendFn(DBDirectClient* client) { + auto send = [client](BSONObj batch) -> Status { + BSONObj res; + auto ok = client->runCommand(SessionsCollection::kSessionsDb.toString(), batch, res); + if (!ok) { + return {ErrorCodes::UnknownError, + client->getLastError(SessionsCollection::kSessionsDb.toString())}; + } + return Status::OK(); + }; + + return send; +} + + +} // namespace mongo diff --git a/src/mongo/db/sessions_collection_standalone.h b/src/mongo/db/sessions_collection_standalone.h new file mode 100644 index 00000000000..0fee4cbd164 --- /dev/null +++ b/src/mongo/db/sessions_collection_standalone.h @@ -0,0 +1,69 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#pragma once + +#include "mongo/db/logical_session_id.h" +#include "mongo/db/sessions_collection.h" +#include "mongo/util/time_support.h" + +namespace mongo { + +class DBDirectClient; +class OperationContext; + +/** + * Accesses the sessions collection directly for standalone servers. + */ +class SessionsCollectionStandalone : public SessionsCollection { +public: + /** + * Returns a LogicalSessionRecord for the given session id, or an error if + * no such record was found. + */ + StatusWith<LogicalSessionRecord> fetchRecord(OperationContext* opCtx, + const LogicalSessionId& lsid) override; + + /** + * Updates the last-use times on the given sessions to be greater than + * or equal to the current time. + */ + Status refreshSessions(OperationContext* opCtx, + const LogicalSessionRecordSet& sessions, + Date_t refreshTime) override; + + /** + * Removes the authoritative records for the specified sessions. + */ + Status removeRecords(OperationContext* opCtx, const LogicalSessionIdSet& sessions) override; + +private: + SessionsCollection::SendBatchFn makeSendFn(DBDirectClient* client); +}; + +} // namespace mongo diff --git a/src/mongo/dbtests/SConscript b/src/mongo/dbtests/SConscript index 74383934018..18b9e406ea6 100644 --- a/src/mongo/dbtests/SConscript +++ b/src/mongo/dbtests/SConscript @@ -71,6 +71,7 @@ dbtest = env.Program( 'jsobjtests.cpp', 'jsontests.cpp', 'jstests.cpp', + 'logical_sessions_tests.cpp', 'matchertests.cpp', 'mmaptests.cpp', 'mock_dbclient_conn_test.cpp', @@ -126,6 +127,7 @@ dbtest = env.Program( "$BUILD_DIR/mongo/db/repl/replication_consistency_markers_impl", "$BUILD_DIR/mongo/db/repl/replmocks", "$BUILD_DIR/mongo/db/serveronly", + "$BUILD_DIR/mongo/db/sessions_collection_standalone", "$BUILD_DIR/mongo/db/logical_clock", "$BUILD_DIR/mongo/db/logical_time_metadata_hook", "$BUILD_DIR/mongo/db/storage/mmap_v1/paths", diff --git a/src/mongo/dbtests/logical_sessions_tests.cpp b/src/mongo/dbtests/logical_sessions_tests.cpp new file mode 100644 index 00000000000..f642cf5e377 --- /dev/null +++ b/src/mongo/dbtests/logical_sessions_tests.cpp @@ -0,0 +1,196 @@ +/** + * Copyright (C) 2017 MongoDB Inc. + * + * This program is free software: you can redistribute it and/or modify + * it under the terms of the GNU Affero General Public License, version 3, + * as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, + * but WITHOUT ANY WARRANTY; without even the implied warranty of + * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the + * GNU Affero General Public License for more details. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see <http://www.gnu.org/licenses/>. + * + * As a special exception, the copyright holders give permission to link the + * code of portions of this program with the OpenSSL library under certain + * conditions as described in each individual source file and distribute + * linked combinations including the program with the OpenSSL library. You + * must comply with the GNU Affero General Public License in all respects for + * all of the code used other than as permitted herein. If you modify file(s) + * with this exception, you may extend this exception to your version of the + * file(s), but you are not obligated to do so. If you do not wish to do so, + * delete this exception statement from your version. If you delete this + * exception statement from all source files in the program, then also delete + * it in the license file. + */ + +#include "mongo/platform/basic.h" + +#include <memory> + +#include "mongo/client/dbclientinterface.h" +#include "mongo/client/index_spec.h" +#include "mongo/db/dbdirectclient.h" +#include "mongo/db/logical_session_id.h" +#include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/operation_context.h" +#include "mongo/db/sessions_collection.h" +#include "mongo/db/sessions_collection_standalone.h" +#include "mongo/dbtests/dbtests.h" +#include "mongo/stdx/memory.h" +#include "mongo/util/time_support.h" + +namespace LogicalSessionTests { + +namespace { +constexpr StringData kTestNS = "admin.system.sessions"_sd; + +LogicalSessionRecord makeRecord(Date_t time = Date_t::now()) { + auto record = makeLogicalSessionRecordForTest(); + record.setLastUse(time); + return record; +} + +Status insertRecord(OperationContext* opCtx, LogicalSessionRecord record) { + DBDirectClient client(opCtx); + + client.insert(kTestNS.toString(), record.toBSON()); + auto errorString = client.getLastError(); + if (errorString.empty()) { + return Status::OK(); + } + + return {ErrorCodes::DuplicateSession, errorString}; +} + +} // namespace + +class SessionsCollectionStandaloneTest { +public: + SessionsCollectionStandaloneTest() + : _collection(stdx::make_unique<SessionsCollectionStandalone>()) { + _opCtx = cc().makeOperationContext(); + DBDirectClient db(opCtx()); + db.remove(ns(), BSONObj()); + } + + virtual ~SessionsCollectionStandaloneTest() { + DBDirectClient db(opCtx()); + db.remove(ns(), BSONObj()); + _opCtx.reset(); + } + + SessionsCollectionStandalone* collection() { + return _collection.get(); + } + + OperationContext* opCtx() { + return _opCtx.get(); + } + + std::string ns() { + return SessionsCollection::kSessionsFullNS.toString(); + } + +private: + std::unique_ptr<SessionsCollectionStandalone> _collection; + ServiceContext::UniqueOperationContext _opCtx; +}; + +// Test that removal from this collection works. +class SessionsCollectionStandaloneRemoveTest : public SessionsCollectionStandaloneTest { +public: + void run() { + auto record1 = makeRecord(); + auto record2 = makeRecord(); + + auto res = insertRecord(opCtx(), record1); + ASSERT_OK(res); + res = insertRecord(opCtx(), record2); + ASSERT_OK(res); + + // Remove one record, the other stays + res = collection()->removeRecords(opCtx(), {record1.getId()}); + ASSERT_OK(res); + + auto swRecord = collection()->fetchRecord(opCtx(), record1.getId()); + ASSERT(!swRecord.isOK()); + + swRecord = collection()->fetchRecord(opCtx(), record2.getId()); + ASSERT(swRecord.isOK()); + } +}; + +// Test that refreshing entries in this collection works. +class SessionsCollectionStandaloneRefreshTest : public SessionsCollectionStandaloneTest { +public: + void run() { + DBDirectClient db(opCtx()); + + // Attempt to refresh one active record, should succeed. + auto now = Date_t::now(); + auto thePast = now - Minutes(5); + + auto record1 = makeRecord(thePast); + auto res = insertRecord(opCtx(), record1); + ASSERT_OK(res); + auto resRefresh = collection()->refreshSessions(opCtx(), {record1}, now); + ASSERT(resRefresh.isOK()); + + // The timestamp on the refreshed record should be updated. + auto swRecord = collection()->fetchRecord(opCtx(), record1.getId()); + ASSERT(swRecord.isOK()); + ASSERT_EQ(swRecord.getValue().getLastUse(), now); + + // Clear the collection. + db.remove(ns(), BSONObj()); + + // Attempt to refresh a record that is not present, should upsert it. + auto record2 = makeRecord(thePast); + resRefresh = collection()->refreshSessions(opCtx(), {record2}, now); + ASSERT(resRefresh.isOK()); + + swRecord = collection()->fetchRecord(opCtx(), record2.getId()); + ASSERT(swRecord.isOK()); + + // Clear the collection. + db.remove(ns(), BSONObj()); + + // Attempt a refresh of many records, split into batches. + LogicalSessionRecordSet toRefresh; + int recordCount = 5000; + for (int i = 0; i < recordCount; i++) { + auto record = makeRecord(thePast); + res = insertRecord(opCtx(), record); + + // Refresh some of these records. + if (i % 4 == 0) { + toRefresh.insert(record); + } + } + + // Run the refresh, should succeed. + resRefresh = collection()->refreshSessions(opCtx(), toRefresh, now); + ASSERT(resRefresh.isOK()); + + // Ensure that the right number of timestamps were updated. + auto n = db.count(ns(), BSON("lastUse" << now)); + ASSERT_EQ(n, toRefresh.size()); + } +}; + +class All : public Suite { +public: + All() : Suite("logical_sessions") {} + + void setupTests() { + add<SessionsCollectionStandaloneRemoveTest>(); + add<SessionsCollectionStandaloneRefreshTest>(); + } +}; + +SuiteInstance<All> all; + +} // namespace LogicalSessionTests diff --git a/src/mongo/idl/idl_test.cpp b/src/mongo/idl/idl_test.cpp index a67de83a429..3b0387b412c 100644 --- a/src/mongo/idl/idl_test.cpp +++ b/src/mongo/idl/idl_test.cpp @@ -547,10 +547,10 @@ TEST(IDLNestedStruct, TestDuplicateTypes) { "field3" << BSON("field1" << 4 << "field2" << 5 << "field3" << 6)); auto testStruct = NestedWithDuplicateTypes::parse(ctxt, testDoc); - assert_same_types<decltype(testStruct.getField1()), const RequiredStrictField3&>(); + assert_same_types<decltype(testStruct.getField1()), RequiredStrictField3&>(); assert_same_types<decltype(testStruct.getField2()), const boost::optional<RequiredNonStrictField3>&>(); - assert_same_types<decltype(testStruct.getField3()), const RequiredStrictField3&>(); + assert_same_types<decltype(testStruct.getField3()), RequiredStrictField3&>(); ASSERT_EQUALS(1, testStruct.getField1().getField1()); ASSERT_EQUALS(2, testStruct.getField1().getField2()); @@ -1228,10 +1228,9 @@ TEST(IDLChainedType, TestChainedStruct) { auto testStruct = Chained_struct_mixed::parse(ctxt, testDoc); - assert_same_types<decltype(testStruct.getChained_any_basic_type()), - const Chained_any_basic_type&>(); + assert_same_types<decltype(testStruct.getChained_any_basic_type()), Chained_any_basic_type&>(); assert_same_types<decltype(testStruct.getChainedObjectBasicType()), - const Chained_object_basic_type&>(); + Chained_object_basic_type&>(); ASSERT_EQUALS(testStruct.getField3(), "abc"); diff --git a/src/mongo/util/periodic_runner.h b/src/mongo/util/periodic_runner.h index 21e37d0e438..6b80226aa55 100644 --- a/src/mongo/util/periodic_runner.h +++ b/src/mongo/util/periodic_runner.h @@ -34,6 +34,8 @@ namespace mongo { +class Client; + /** * An interface for objects that run work items at specified intervals. * @@ -41,10 +43,12 @@ namespace mongo { * model they wish. Implementations may choose when to stop running * scheduled jobs (for example, some implementations may stop running * when the server is in global shutdown). + * + * The runner will create client objects that it passes to jobs to use. */ class PeriodicRunner { public: - using Job = stdx::function<void()>; + using Job = stdx::function<void(Client* client)>; struct PeriodicJob { PeriodicJob(Job callable, Milliseconds period) diff --git a/src/mongo/util/periodic_runner_asio.cpp b/src/mongo/util/periodic_runner_asio.cpp index 9fadfc67124..5ff42019340 100644 --- a/src/mongo/util/periodic_runner_asio.cpp +++ b/src/mongo/util/periodic_runner_asio.cpp @@ -95,7 +95,7 @@ void PeriodicRunnerASIO::_scheduleJob(std::weak_ptr<PeriodicJobASIO> job) { } lockedJob->start = _timerFactory->now(); - lockedJob->job(); + lockedJob->job(_client); _io_service.post([this, job]() mutable { _scheduleJob(job); }); }); @@ -110,11 +110,16 @@ Status PeriodicRunnerASIO::startup() { _state = State::kRunning; _thread = stdx::thread([this]() { try { - Client::initThread("PeriodicRunnerASIO"); + auto client = getGlobalServiceContext()->makeClient("PeriodicRunnerASIO"); + _client = client.get(); + Client::setCurrent(std::move(client)); asio::io_service::work workItem(_io_service); std::error_code ec; _io_service.run(ec); + + client = Client::releaseCurrent(); + if (ec) { severe() << "Failure in PeriodicRunnerASIO: " << ec.message(); fassertFailed(40438); diff --git a/src/mongo/util/periodic_runner_asio.h b/src/mongo/util/periodic_runner_asio.h index e5be509f0ea..dcf11429594 100644 --- a/src/mongo/util/periodic_runner_asio.h +++ b/src/mongo/util/periodic_runner_asio.h @@ -37,6 +37,8 @@ namespace mongo { +class Client; + /** * A PeriodicRunner implementation that uses the ASIO library's eventing system * to schedule and run jobs at regular intervals. @@ -106,6 +108,7 @@ private: asio::io_service _io_service; asio::io_service::strand _strand; + Client* _client; stdx::thread _thread; std::unique_ptr<executor::AsyncTimerFactoryInterface> _timerFactory; diff --git a/src/mongo/util/periodic_runner_asio_test.cpp b/src/mongo/util/periodic_runner_asio_test.cpp index 7a5e7b5fc04..86397801b11 100644 --- a/src/mongo/util/periodic_runner_asio_test.cpp +++ b/src/mongo/util/periodic_runner_asio_test.cpp @@ -38,6 +38,9 @@ #include "mongo/util/periodic_runner_asio.h" namespace mongo { + +class Client; + namespace { class PeriodicRunnerASIOTestNoSetup : public unittest::Test { @@ -91,7 +94,7 @@ TEST_F(PeriodicRunnerASIOTest, OneJobTest) { // Add a job, ensure that it runs once PeriodicRunner::PeriodicJob job( - [&count, &mutex, &cv] { + [&count, &mutex, &cv](Client*) { { stdx::unique_lock<stdx::mutex> lk(mutex); count++; @@ -128,7 +131,7 @@ TEST_F(PeriodicRunnerASIOTestNoSetup, ScheduleBeforeStartupTest) { // Schedule a job before startup PeriodicRunner::PeriodicJob job( - [&count, &mutex, &cv] { + [&count, &mutex, &cv](Client*) { { stdx::unique_lock<stdx::mutex> lk(mutex); count++; @@ -153,7 +156,7 @@ TEST_F(PeriodicRunnerASIOTest, ScheduleAfterShutdownTest) { Milliseconds interval{5}; // Schedule a job before shutdown - PeriodicRunner::PeriodicJob job([&count] { count++; }, interval); + PeriodicRunner::PeriodicJob job([&count](Client*) { count++; }, interval); runner()->scheduleJob(std::move(job)); @@ -185,7 +188,7 @@ TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) { // Add two jobs, ensure they both run the proper number of times PeriodicRunner::PeriodicJob jobA( - [&countA, &mutex, &cv] { + [&countA, &mutex, &cv](Client*) { { stdx::unique_lock<stdx::mutex> lk(mutex); countA++; @@ -195,7 +198,7 @@ TEST_F(PeriodicRunnerASIOTest, TwoJobsTest) { intervalA); PeriodicRunner::PeriodicJob jobB( - [&countB, &mutex, &cv] { + [&countB, &mutex, &cv](Client*) { { stdx::unique_lock<stdx::mutex> lk(mutex); countB++; |