diff options
Diffstat (limited to 'src/mongo/db/logical_session_cache_test.cpp')
-rw-r--r-- | src/mongo/db/logical_session_cache_test.cpp | 332 |
1 files changed, 127 insertions, 205 deletions
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp index 381801cecab..436f0af2267 100644 --- a/src/mongo/db/logical_session_cache_test.cpp +++ b/src/mongo/db/logical_session_cache_test.cpp @@ -33,6 +33,8 @@ #include "mongo/db/logical_session_cache.h" #include "mongo/db/logical_session_id.h" #include "mongo/db/logical_session_id_helpers.h" +#include "mongo/db/operation_context_noop.h" +#include "mongo/db/service_context_noop.h" #include "mongo/db/service_liason_mock.h" #include "mongo/db/sessions_collection_mock.h" #include "mongo/stdx/future.h" @@ -42,8 +44,7 @@ namespace mongo { namespace { -const Milliseconds kSessionTimeout = - duration_cast<Milliseconds>(LogicalSessionCache::kLogicalSessionDefaultTimeout); +const Milliseconds kSessionTimeout = duration_cast<Milliseconds>(kLogicalSessionDefaultTimeout); const Milliseconds kForceRefresh = duration_cast<Milliseconds>(LogicalSessionCache::kLogicalSessionDefaultRefresh); @@ -60,6 +61,11 @@ public: _sessions(std::make_shared<MockSessionsCollectionImpl>()) {} void setUp() override { + auto client = serviceContext.makeClient("testClient"); + _opCtx = client->makeOperationContext(); + _client = client.get(); + Client::setCurrent(std::move(client)); + auto mockService = stdx::make_unique<MockServiceLiason>(_service); auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions); _cache = @@ -67,7 +73,12 @@ public: } void tearDown() override { + if (_opCtx) { + _opCtx.reset(); + } + _service->join(); + auto client = Client::releaseCurrent(); } void waitUntilRefreshScheduled() { @@ -88,11 +99,32 @@ public: return _sessions; } + void setOpCtx() { + _opCtx = client()->makeOperationContext(); + } + + void clearOpCtx() { + _opCtx.reset(); + } + + OperationContext* opCtx() { + return _opCtx.get(); + } + + Client* client() { + return _client; + } + private: + ServiceContextNoop serviceContext; + ServiceContext::UniqueOperationContext _opCtx; + std::shared_ptr<MockServiceLiasonImpl> _service; std::shared_ptr<MockSessionsCollectionImpl> _sessions; std::unique_ptr<LogicalSessionCache> _cache; + + Client* _client; }; // Test that session cache fetches new records from the sessions collection @@ -100,12 +132,12 @@ TEST_F(LogicalSessionCacheTest, CacheFetchesNewRecords) { auto lsid = makeLogicalSessionIdForTest(); // When the record is not present (and not in the sessions collection) returns an error - auto res = cache()->fetchAndPromote(lsid); + auto res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(!res.isOK()); // When the record is not present (but is in the sessions collection) returns it sessions()->add(makeLogicalSessionRecord(lsid, service()->now())); - res = cache()->fetchAndPromote(lsid); + res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); // When the record is present in the cache, returns it @@ -115,7 +147,7 @@ TEST_F(LogicalSessionCacheTest, CacheFetchesNewRecords) { return {ErrorCodes::NoSuchSession, "no such session"}; }); - res = cache()->fetchAndPromote(lsid); + res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); } @@ -133,7 +165,7 @@ TEST_F(LogicalSessionCacheTest, TestCacheHitsOnly) { ASSERT(!res.isOK()); // When the record is present, returns the owner - cache()->fetchAndPromote(lsid).transitional_ignore(); + cache()->fetchAndPromote(opCtx(), lsid).transitional_ignore(); res = cache()->promote(lsid); ASSERT(res.isOK()); } @@ -150,17 +182,17 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) { // Fast forward time and fetch service()->fastForward(Milliseconds(500)); ASSERT(start != service()->now()); - auto res = cache()->fetchAndPromote(lsid); + auto res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); // Now that we fetched, lifetime of session should be extended service()->fastForward(kSessionTimeout - Milliseconds(500)); - res = cache()->fetchAndPromote(lsid); + res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); // We fetched again, so lifetime extended again service()->fastForward(kSessionTimeout - Milliseconds(10)); - res = cache()->fetchAndPromote(lsid); + res = cache()->fetchAndPromote(opCtx(), lsid); ASSERT(res.isOK()); // Fast forward and hit-only fetch @@ -181,56 +213,63 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) { // Test the startSession method TEST_F(LogicalSessionCacheTest, StartSession) { - auto lsid = makeLogicalSessionIdForTest(); + auto record = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now()); + auto lsid = record.getId(); // Test starting a new session - auto res = cache()->startSession(lsid); + auto res = cache()->startSession(opCtx(), record); ASSERT(res.isOK()); + + // Record will not be in the collection yet; refresh must happen first. + ASSERT(!sessions()->has(lsid)); + + // Do refresh, cached records should get flushed to collection. + clearOpCtx(); + cache()->refreshNow(client()); ASSERT(sessions()->has(lsid)); - // Try to start a session that is already in the sessions collection and our - // local cache, should fail - res = cache()->startSession(lsid); - ASSERT(!res.isOK()); + // Try to start the same session again, should succeed. + res = cache()->startSession(opCtx(), record); + ASSERT(res.isOK()); // Try to start a session that is already in the sessions collection but - // is not in our local cache, should fail + // is not in our local cache, should succeed. auto record2 = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now()); - auto lsid2 = record2.getId(); - sessions()->add(std::move(record2)); - res = cache()->startSession(lsid2); - ASSERT(!res.isOK()); + sessions()->add(record2); + res = cache()->startSession(opCtx(), record2); + ASSERT(res.isOK()); // Try to start a session that has expired from our cache, and is no // longer in the sessions collection, should succeed service()->fastForward(Milliseconds(kSessionTimeout.count() + 5)); sessions()->remove(lsid); ASSERT(!sessions()->has(lsid)); - res = cache()->startSession(lsid); + res = cache()->startSession(opCtx(), record); ASSERT(res.isOK()); - ASSERT(sessions()->has(lsid)); } // Test that records in the cache are properly refreshed until they expire TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) { // Insert two records into the cache - auto lsid1 = makeLogicalSessionIdForTest(); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid1).transitional_ignore(); - cache()->startSession(lsid2).transitional_ignore(); + auto record1 = makeLogicalSessionRecordForTest(); + auto record2 = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record1).transitional_ignore(); + cache()->startSession(opCtx(), record2).transitional_ignore(); stdx::promise<int> hitRefresh; auto refreshFuture = hitRefresh.get_future(); // Advance time to first refresh point, check that refresh happens, and // that it includes both our records - sessions()->setRefreshHook([&hitRefresh](LogicalSessionIdSet sessions) { + sessions()->setRefreshHook([&hitRefresh](const LogicalSessionRecordSet& sessions) { hitRefresh.set_value(sessions.size()); - return LogicalSessionIdSet{}; + return Status::OK(); }); // Wait for the refresh to happen + clearOpCtx(); service()->fastForward(kForceRefresh); + cache()->refreshNow(client()); refreshFuture.wait(); ASSERT_EQ(refreshFuture.get(), 2); @@ -240,98 +279,39 @@ TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) { auto refresh2Future = refresh2.get_future(); // Use one of the records - auto res = cache()->fetchAndPromote(lsid1); + setOpCtx(); + auto res = cache()->fetchAndPromote(opCtx(), record1.getId()); ASSERT(res.isOK()); // Advance time so that one record expires // Ensure that first record was refreshed, and second was thrown away - sessions()->setRefreshHook([&refresh2](LogicalSessionIdSet sessions) { + sessions()->setRefreshHook([&refresh2](const LogicalSessionRecordSet& sessions) { // We should only have one record here, the other should have expired ASSERT_EQ(sessions.size(), size_t(1)); - refresh2.set_value(*(sessions.begin())); - return LogicalSessionIdSet{}; + refresh2.set_value(sessions.begin()->getId()); + return Status::OK(); }); - // Wait until the second job has been scheduled - waitUntilRefreshScheduled(); - + clearOpCtx(); service()->fastForward(kSessionTimeout - kForceRefresh + Milliseconds(1)); + cache()->refreshNow(client()); refresh2Future.wait(); - ASSERT_EQ(refresh2Future.get(), lsid1); -} - -// Test that cache deletes records that fail to refresh -TEST_F(LogicalSessionCacheTest, CacheDeletesRecordsThatFailToRefresh) { - // Put two sessions into the cache - auto lsid1 = makeLogicalSessionIdForTest(); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid1).transitional_ignore(); - cache()->startSession(lsid2).transitional_ignore(); - - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); - - // Record 1 fails to refresh - sessions()->setRefreshHook([&hitRefresh, &lsid1](LogicalSessionIdSet sessions) { - ASSERT_EQ(sessions.size(), size_t(2)); - hitRefresh.set_value(); - return LogicalSessionIdSet{lsid1}; - }); - - // Force a refresh - service()->fastForward(kForceRefresh); - refreshFuture.wait(); - - // Ensure that one record is still there and the other is gone - auto res = cache()->promote(lsid1); - ASSERT(!res.isOK()); - res = cache()->promote(lsid2); - ASSERT(res.isOK()); -} - -// Test that we don't remove records that fail to refresh if they are active on the service -TEST_F(LogicalSessionCacheTest, KeepActiveSessionAliveEvenIfRefreshFails) { - // Put two sessions into the cache, one into the service - auto lsid1 = makeLogicalSessionIdForTest(); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid1).transitional_ignore(); - service()->add(lsid1); - cache()->startSession(lsid2).transitional_ignore(); - - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); - - // SignedLsid 1 fails to refresh - sessions()->setRefreshHook([&hitRefresh, &lsid1](LogicalSessionIdSet sessions) { - ASSERT_EQ(sessions.size(), size_t(2)); - hitRefresh.set_value(); - return LogicalSessionIdSet{lsid1}; - }); - - // Force a refresh - service()->fastForward(kForceRefresh); - refreshFuture.wait(); - - // Ensure that both lsids are still there - auto res = cache()->promote(lsid1); - ASSERT(res.isOK()); - res = cache()->promote(lsid2); - ASSERT(res.isOK()); + ASSERT_EQ(refresh2Future.get(), record1.getId()); } // Test that session cache properly expires lsids after 30 minutes of no use TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) { // Insert a lsid - auto lsid = makeLogicalSessionIdForTest(); - cache()->startSession(lsid).transitional_ignore(); - auto res = cache()->promote(lsid); + auto record = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record).transitional_ignore(); + auto res = cache()->promote(record.getId()); ASSERT(res.isOK()); // Force it to expire service()->fastForward(Milliseconds(kSessionTimeout.count() + 5)); // Check that it is no longer in the cache - res = cache()->promote(lsid); + res = cache()->promote(record.getId()); ASSERT(!res.isOK()); } @@ -342,47 +322,31 @@ TEST_F(LogicalSessionCacheTest, LongRunningQueriesAreRefreshed) { // Insert one active lsid on the service, none in the cache service()->add(lsid); - stdx::mutex mutex; - stdx::condition_variable cv; int count = 0; - sessions()->setRefreshHook([&cv, &mutex, &count, &lsid](LogicalSessionIdSet sessions) { - ASSERT_EQ(*(sessions.begin()), lsid); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - count++; - } - cv.notify_all(); - - return LogicalSessionIdSet{}; + sessions()->setRefreshHook([&count, &lsid](const LogicalSessionRecordSet& sessions) { + ASSERT_EQ(sessions.size(), size_t(1)); + ASSERT_EQ(sessions.begin()->getId(), lsid); + count++; + return Status::OK(); }); + clearOpCtx(); + // Force a refresh, it should refresh our active session service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count] { return count == 1; }); - } - - // Wait until the next job has been scheduled - waitUntilRefreshScheduled(); + cache()->refreshNow(client()); + ASSERT_EQ(count, 1); // Force a session timeout, session is still on the service service()->fastForward(kSessionTimeout); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count] { return count == 2; }); - } - - // Wait until the next job has been scheduled - waitUntilRefreshScheduled(); + cache()->refreshNow(client()); + ASSERT_EQ(count, 2); // Force another refresh, check that it refreshes that active lsid again service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&count] { return count == 3; }); - } + cache()->refreshNow(client()); + ASSERT_EQ(count, 3); } // Test that the set of lsids we refresh is a sum of cached + active lsids @@ -390,45 +354,39 @@ TEST_F(LogicalSessionCacheTest, RefreshCachedAndServiceSignedLsidsTogether) { // Put one session into the cache, one into the service auto lsid1 = makeLogicalSessionIdForTest(); service()->add(lsid1); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid2).transitional_ignore(); - - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); + auto record2 = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record2).transitional_ignore(); - // Both lsids refresh - sessions()->setRefreshHook([&hitRefresh](LogicalSessionIdSet sessions) { + // Both signedLsids refresh + sessions()->setRefreshHook([](const LogicalSessionRecordSet& sessions) { ASSERT_EQ(sessions.size(), size_t(2)); - hitRefresh.set_value(); - return LogicalSessionIdSet{}; + return Status::OK(); }); // Force a refresh + clearOpCtx(); service()->fastForward(kForceRefresh); - refreshFuture.wait(); + cache()->refreshNow(client()); } // Test large sets of cache-only session lsids TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) { int count = LogicalSessionCache::kLogicalSessionCacheDefaultCapacity; for (int i = 0; i < count; i++) { - auto lsid = makeLogicalSessionIdForTest(); - cache()->startSession(lsid).transitional_ignore(); + auto record = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record).transitional_ignore(); } - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); - - // Check that all lsids refresh - sessions()->setRefreshHook([&hitRefresh, &count](LogicalSessionIdSet sessions) { + // Check that all signedLsids refresh + sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) { ASSERT_EQ(sessions.size(), size_t(count)); - hitRefresh.set_value(); - return LogicalSessionIdSet{}; + return Status::OK(); }); // Force a refresh + clearOpCtx(); service()->fastForward(kForceRefresh); - refreshFuture.wait(); + cache()->refreshNow(client()); } // Test larger sets of service-only session lsids @@ -439,19 +397,16 @@ TEST_F(LogicalSessionCacheTest, ManyLongRunningSessionsRefresh) { service()->add(lsid); } - stdx::promise<void> hitRefresh; - auto refreshFuture = hitRefresh.get_future(); - - // Check that all lsids refresh - sessions()->setRefreshHook([&hitRefresh, &count](LogicalSessionIdSet sessions) { + // Check that all signedLsids refresh + sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) { ASSERT_EQ(sessions.size(), size_t(count)); - hitRefresh.set_value(); - return LogicalSessionIdSet{}; + return Status::OK(); }); // Force a refresh + clearOpCtx(); service()->fastForward(kForceRefresh); - refreshFuture.wait(); + cache()->refreshNow(client()); } // Test larger mixed sets of cache/service active sessions @@ -461,77 +416,44 @@ TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) { auto lsid = makeLogicalSessionIdForTest(); service()->add(lsid); - auto lsid2 = makeLogicalSessionIdForTest(); - cache()->startSession(lsid2).transitional_ignore(); + auto record2 = makeLogicalSessionRecordForTest(); + cache()->startSession(opCtx(), record2).transitional_ignore(); } - stdx::mutex mutex; - stdx::condition_variable cv; - int refreshes = 0; int nRefreshed = 0; // Check that all lsids refresh successfully - sessions()->setRefreshHook( - [&refreshes, &mutex, &cv, &nRefreshed](LogicalSessionIdSet sessions) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - refreshes++; - nRefreshed = sessions.size(); - } - cv.notify_all(); - - return LogicalSessionIdSet{}; - }); + sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) { + nRefreshed = sessions.size(); + return Status::OK(); + }); // Force a refresh + clearOpCtx(); service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&refreshes] { return refreshes == 1; }); - } + cache()->refreshNow(client()); ASSERT_EQ(nRefreshed, count * 2); // Remove all of the service sessions, should just refresh the cache entries - // (and make all but one fail to refresh) service()->clear(); - sessions()->setRefreshHook( - [&refreshes, &mutex, &cv, &nRefreshed](LogicalSessionIdSet sessions) { - { - stdx::unique_lock<stdx::mutex> lk(mutex); - refreshes++; - nRefreshed = sessions.size(); - } - cv.notify_all(); - - sessions.erase(sessions.begin()); - return sessions; - }); - - // Wait for job to be scheduled - waitUntilRefreshScheduled(); + sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) { + nRefreshed = sessions.size(); + return Status::OK(); + }); // Force another refresh service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&refreshes] { return refreshes == 2; }); - } + cache()->refreshNow(client()); // We should not have refreshed any sessions from the service, only the cache ASSERT_EQ(nRefreshed, count); - // Wait for job to be scheduled - waitUntilRefreshScheduled(); - // Force a third refresh service()->fastForward(kForceRefresh); - { - stdx::unique_lock<stdx::mutex> lk(mutex); - cv.wait(lk, [&refreshes] { return refreshes == 3; }); - } + cache()->refreshNow(client()); - // Since all but one lsid failed to refresh, third set should just have one lsid - ASSERT_EQ(nRefreshed, 1); + // Again, we should have only refreshed sessions from the cache + ASSERT_EQ(nRefreshed, count); } } // namespace |