summaryrefslogtreecommitdiff
path: root/src/mongo/db/logical_session_cache_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/logical_session_cache_test.cpp')
-rw-r--r--src/mongo/db/logical_session_cache_test.cpp332
1 files changed, 127 insertions, 205 deletions
diff --git a/src/mongo/db/logical_session_cache_test.cpp b/src/mongo/db/logical_session_cache_test.cpp
index 381801cecab..436f0af2267 100644
--- a/src/mongo/db/logical_session_cache_test.cpp
+++ b/src/mongo/db/logical_session_cache_test.cpp
@@ -33,6 +33,8 @@
#include "mongo/db/logical_session_cache.h"
#include "mongo/db/logical_session_id.h"
#include "mongo/db/logical_session_id_helpers.h"
+#include "mongo/db/operation_context_noop.h"
+#include "mongo/db/service_context_noop.h"
#include "mongo/db/service_liason_mock.h"
#include "mongo/db/sessions_collection_mock.h"
#include "mongo/stdx/future.h"
@@ -42,8 +44,7 @@
namespace mongo {
namespace {
-const Milliseconds kSessionTimeout =
- duration_cast<Milliseconds>(LogicalSessionCache::kLogicalSessionDefaultTimeout);
+const Milliseconds kSessionTimeout = duration_cast<Milliseconds>(kLogicalSessionDefaultTimeout);
const Milliseconds kForceRefresh =
duration_cast<Milliseconds>(LogicalSessionCache::kLogicalSessionDefaultRefresh);
@@ -60,6 +61,11 @@ public:
_sessions(std::make_shared<MockSessionsCollectionImpl>()) {}
void setUp() override {
+ auto client = serviceContext.makeClient("testClient");
+ _opCtx = client->makeOperationContext();
+ _client = client.get();
+ Client::setCurrent(std::move(client));
+
auto mockService = stdx::make_unique<MockServiceLiason>(_service);
auto mockSessions = stdx::make_unique<MockSessionsCollection>(_sessions);
_cache =
@@ -67,7 +73,12 @@ public:
}
void tearDown() override {
+ if (_opCtx) {
+ _opCtx.reset();
+ }
+
_service->join();
+ auto client = Client::releaseCurrent();
}
void waitUntilRefreshScheduled() {
@@ -88,11 +99,32 @@ public:
return _sessions;
}
+ void setOpCtx() {
+ _opCtx = client()->makeOperationContext();
+ }
+
+ void clearOpCtx() {
+ _opCtx.reset();
+ }
+
+ OperationContext* opCtx() {
+ return _opCtx.get();
+ }
+
+ Client* client() {
+ return _client;
+ }
+
private:
+ ServiceContextNoop serviceContext;
+ ServiceContext::UniqueOperationContext _opCtx;
+
std::shared_ptr<MockServiceLiasonImpl> _service;
std::shared_ptr<MockSessionsCollectionImpl> _sessions;
std::unique_ptr<LogicalSessionCache> _cache;
+
+ Client* _client;
};
// Test that session cache fetches new records from the sessions collection
@@ -100,12 +132,12 @@ TEST_F(LogicalSessionCacheTest, CacheFetchesNewRecords) {
auto lsid = makeLogicalSessionIdForTest();
// When the record is not present (and not in the sessions collection) returns an error
- auto res = cache()->fetchAndPromote(lsid);
+ auto res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(!res.isOK());
// When the record is not present (but is in the sessions collection) returns it
sessions()->add(makeLogicalSessionRecord(lsid, service()->now()));
- res = cache()->fetchAndPromote(lsid);
+ res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
// When the record is present in the cache, returns it
@@ -115,7 +147,7 @@ TEST_F(LogicalSessionCacheTest, CacheFetchesNewRecords) {
return {ErrorCodes::NoSuchSession, "no such session"};
});
- res = cache()->fetchAndPromote(lsid);
+ res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
}
@@ -133,7 +165,7 @@ TEST_F(LogicalSessionCacheTest, TestCacheHitsOnly) {
ASSERT(!res.isOK());
// When the record is present, returns the owner
- cache()->fetchAndPromote(lsid).transitional_ignore();
+ cache()->fetchAndPromote(opCtx(), lsid).transitional_ignore();
res = cache()->promote(lsid);
ASSERT(res.isOK());
}
@@ -150,17 +182,17 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) {
// Fast forward time and fetch
service()->fastForward(Milliseconds(500));
ASSERT(start != service()->now());
- auto res = cache()->fetchAndPromote(lsid);
+ auto res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
// Now that we fetched, lifetime of session should be extended
service()->fastForward(kSessionTimeout - Milliseconds(500));
- res = cache()->fetchAndPromote(lsid);
+ res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
// We fetched again, so lifetime extended again
service()->fastForward(kSessionTimeout - Milliseconds(10));
- res = cache()->fetchAndPromote(lsid);
+ res = cache()->fetchAndPromote(opCtx(), lsid);
ASSERT(res.isOK());
// Fast forward and hit-only fetch
@@ -181,56 +213,63 @@ TEST_F(LogicalSessionCacheTest, FetchUpdatesLastUse) {
// Test the startSession method
TEST_F(LogicalSessionCacheTest, StartSession) {
- auto lsid = makeLogicalSessionIdForTest();
+ auto record = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now());
+ auto lsid = record.getId();
// Test starting a new session
- auto res = cache()->startSession(lsid);
+ auto res = cache()->startSession(opCtx(), record);
ASSERT(res.isOK());
+
+ // Record will not be in the collection yet; refresh must happen first.
+ ASSERT(!sessions()->has(lsid));
+
+ // Do refresh, cached records should get flushed to collection.
+ clearOpCtx();
+ cache()->refreshNow(client());
ASSERT(sessions()->has(lsid));
- // Try to start a session that is already in the sessions collection and our
- // local cache, should fail
- res = cache()->startSession(lsid);
- ASSERT(!res.isOK());
+ // Try to start the same session again, should succeed.
+ res = cache()->startSession(opCtx(), record);
+ ASSERT(res.isOK());
// Try to start a session that is already in the sessions collection but
- // is not in our local cache, should fail
+ // is not in our local cache, should succeed.
auto record2 = makeLogicalSessionRecord(makeLogicalSessionIdForTest(), service()->now());
- auto lsid2 = record2.getId();
- sessions()->add(std::move(record2));
- res = cache()->startSession(lsid2);
- ASSERT(!res.isOK());
+ sessions()->add(record2);
+ res = cache()->startSession(opCtx(), record2);
+ ASSERT(res.isOK());
// Try to start a session that has expired from our cache, and is no
// longer in the sessions collection, should succeed
service()->fastForward(Milliseconds(kSessionTimeout.count() + 5));
sessions()->remove(lsid);
ASSERT(!sessions()->has(lsid));
- res = cache()->startSession(lsid);
+ res = cache()->startSession(opCtx(), record);
ASSERT(res.isOK());
- ASSERT(sessions()->has(lsid));
}
// Test that records in the cache are properly refreshed until they expire
TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) {
// Insert two records into the cache
- auto lsid1 = makeLogicalSessionIdForTest();
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid1).transitional_ignore();
- cache()->startSession(lsid2).transitional_ignore();
+ auto record1 = makeLogicalSessionRecordForTest();
+ auto record2 = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record1).transitional_ignore();
+ cache()->startSession(opCtx(), record2).transitional_ignore();
stdx::promise<int> hitRefresh;
auto refreshFuture = hitRefresh.get_future();
// Advance time to first refresh point, check that refresh happens, and
// that it includes both our records
- sessions()->setRefreshHook([&hitRefresh](LogicalSessionIdSet sessions) {
+ sessions()->setRefreshHook([&hitRefresh](const LogicalSessionRecordSet& sessions) {
hitRefresh.set_value(sessions.size());
- return LogicalSessionIdSet{};
+ return Status::OK();
});
// Wait for the refresh to happen
+ clearOpCtx();
service()->fastForward(kForceRefresh);
+ cache()->refreshNow(client());
refreshFuture.wait();
ASSERT_EQ(refreshFuture.get(), 2);
@@ -240,98 +279,39 @@ TEST_F(LogicalSessionCacheTest, CacheRefreshesOwnRecords) {
auto refresh2Future = refresh2.get_future();
// Use one of the records
- auto res = cache()->fetchAndPromote(lsid1);
+ setOpCtx();
+ auto res = cache()->fetchAndPromote(opCtx(), record1.getId());
ASSERT(res.isOK());
// Advance time so that one record expires
// Ensure that first record was refreshed, and second was thrown away
- sessions()->setRefreshHook([&refresh2](LogicalSessionIdSet sessions) {
+ sessions()->setRefreshHook([&refresh2](const LogicalSessionRecordSet& sessions) {
// We should only have one record here, the other should have expired
ASSERT_EQ(sessions.size(), size_t(1));
- refresh2.set_value(*(sessions.begin()));
- return LogicalSessionIdSet{};
+ refresh2.set_value(sessions.begin()->getId());
+ return Status::OK();
});
- // Wait until the second job has been scheduled
- waitUntilRefreshScheduled();
-
+ clearOpCtx();
service()->fastForward(kSessionTimeout - kForceRefresh + Milliseconds(1));
+ cache()->refreshNow(client());
refresh2Future.wait();
- ASSERT_EQ(refresh2Future.get(), lsid1);
-}
-
-// Test that cache deletes records that fail to refresh
-TEST_F(LogicalSessionCacheTest, CacheDeletesRecordsThatFailToRefresh) {
- // Put two sessions into the cache
- auto lsid1 = makeLogicalSessionIdForTest();
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid1).transitional_ignore();
- cache()->startSession(lsid2).transitional_ignore();
-
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
-
- // Record 1 fails to refresh
- sessions()->setRefreshHook([&hitRefresh, &lsid1](LogicalSessionIdSet sessions) {
- ASSERT_EQ(sessions.size(), size_t(2));
- hitRefresh.set_value();
- return LogicalSessionIdSet{lsid1};
- });
-
- // Force a refresh
- service()->fastForward(kForceRefresh);
- refreshFuture.wait();
-
- // Ensure that one record is still there and the other is gone
- auto res = cache()->promote(lsid1);
- ASSERT(!res.isOK());
- res = cache()->promote(lsid2);
- ASSERT(res.isOK());
-}
-
-// Test that we don't remove records that fail to refresh if they are active on the service
-TEST_F(LogicalSessionCacheTest, KeepActiveSessionAliveEvenIfRefreshFails) {
- // Put two sessions into the cache, one into the service
- auto lsid1 = makeLogicalSessionIdForTest();
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid1).transitional_ignore();
- service()->add(lsid1);
- cache()->startSession(lsid2).transitional_ignore();
-
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
-
- // SignedLsid 1 fails to refresh
- sessions()->setRefreshHook([&hitRefresh, &lsid1](LogicalSessionIdSet sessions) {
- ASSERT_EQ(sessions.size(), size_t(2));
- hitRefresh.set_value();
- return LogicalSessionIdSet{lsid1};
- });
-
- // Force a refresh
- service()->fastForward(kForceRefresh);
- refreshFuture.wait();
-
- // Ensure that both lsids are still there
- auto res = cache()->promote(lsid1);
- ASSERT(res.isOK());
- res = cache()->promote(lsid2);
- ASSERT(res.isOK());
+ ASSERT_EQ(refresh2Future.get(), record1.getId());
}
// Test that session cache properly expires lsids after 30 minutes of no use
TEST_F(LogicalSessionCacheTest, BasicSessionExpiration) {
// Insert a lsid
- auto lsid = makeLogicalSessionIdForTest();
- cache()->startSession(lsid).transitional_ignore();
- auto res = cache()->promote(lsid);
+ auto record = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record).transitional_ignore();
+ auto res = cache()->promote(record.getId());
ASSERT(res.isOK());
// Force it to expire
service()->fastForward(Milliseconds(kSessionTimeout.count() + 5));
// Check that it is no longer in the cache
- res = cache()->promote(lsid);
+ res = cache()->promote(record.getId());
ASSERT(!res.isOK());
}
@@ -342,47 +322,31 @@ TEST_F(LogicalSessionCacheTest, LongRunningQueriesAreRefreshed) {
// Insert one active lsid on the service, none in the cache
service()->add(lsid);
- stdx::mutex mutex;
- stdx::condition_variable cv;
int count = 0;
- sessions()->setRefreshHook([&cv, &mutex, &count, &lsid](LogicalSessionIdSet sessions) {
- ASSERT_EQ(*(sessions.begin()), lsid);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- count++;
- }
- cv.notify_all();
-
- return LogicalSessionIdSet{};
+ sessions()->setRefreshHook([&count, &lsid](const LogicalSessionRecordSet& sessions) {
+ ASSERT_EQ(sessions.size(), size_t(1));
+ ASSERT_EQ(sessions.begin()->getId(), lsid);
+ count++;
+ return Status::OK();
});
+ clearOpCtx();
+
// Force a refresh, it should refresh our active session
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count] { return count == 1; });
- }
-
- // Wait until the next job has been scheduled
- waitUntilRefreshScheduled();
+ cache()->refreshNow(client());
+ ASSERT_EQ(count, 1);
// Force a session timeout, session is still on the service
service()->fastForward(kSessionTimeout);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count] { return count == 2; });
- }
-
- // Wait until the next job has been scheduled
- waitUntilRefreshScheduled();
+ cache()->refreshNow(client());
+ ASSERT_EQ(count, 2);
// Force another refresh, check that it refreshes that active lsid again
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&count] { return count == 3; });
- }
+ cache()->refreshNow(client());
+ ASSERT_EQ(count, 3);
}
// Test that the set of lsids we refresh is a sum of cached + active lsids
@@ -390,45 +354,39 @@ TEST_F(LogicalSessionCacheTest, RefreshCachedAndServiceSignedLsidsTogether) {
// Put one session into the cache, one into the service
auto lsid1 = makeLogicalSessionIdForTest();
service()->add(lsid1);
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid2).transitional_ignore();
-
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
+ auto record2 = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record2).transitional_ignore();
- // Both lsids refresh
- sessions()->setRefreshHook([&hitRefresh](LogicalSessionIdSet sessions) {
+ // Both signedLsids refresh
+ sessions()->setRefreshHook([](const LogicalSessionRecordSet& sessions) {
ASSERT_EQ(sessions.size(), size_t(2));
- hitRefresh.set_value();
- return LogicalSessionIdSet{};
+ return Status::OK();
});
// Force a refresh
+ clearOpCtx();
service()->fastForward(kForceRefresh);
- refreshFuture.wait();
+ cache()->refreshNow(client());
}
// Test large sets of cache-only session lsids
TEST_F(LogicalSessionCacheTest, ManySignedLsidsInCacheRefresh) {
int count = LogicalSessionCache::kLogicalSessionCacheDefaultCapacity;
for (int i = 0; i < count; i++) {
- auto lsid = makeLogicalSessionIdForTest();
- cache()->startSession(lsid).transitional_ignore();
+ auto record = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record).transitional_ignore();
}
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
-
- // Check that all lsids refresh
- sessions()->setRefreshHook([&hitRefresh, &count](LogicalSessionIdSet sessions) {
+ // Check that all signedLsids refresh
+ sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) {
ASSERT_EQ(sessions.size(), size_t(count));
- hitRefresh.set_value();
- return LogicalSessionIdSet{};
+ return Status::OK();
});
// Force a refresh
+ clearOpCtx();
service()->fastForward(kForceRefresh);
- refreshFuture.wait();
+ cache()->refreshNow(client());
}
// Test larger sets of service-only session lsids
@@ -439,19 +397,16 @@ TEST_F(LogicalSessionCacheTest, ManyLongRunningSessionsRefresh) {
service()->add(lsid);
}
- stdx::promise<void> hitRefresh;
- auto refreshFuture = hitRefresh.get_future();
-
- // Check that all lsids refresh
- sessions()->setRefreshHook([&hitRefresh, &count](LogicalSessionIdSet sessions) {
+ // Check that all signedLsids refresh
+ sessions()->setRefreshHook([&count](const LogicalSessionRecordSet& sessions) {
ASSERT_EQ(sessions.size(), size_t(count));
- hitRefresh.set_value();
- return LogicalSessionIdSet{};
+ return Status::OK();
});
// Force a refresh
+ clearOpCtx();
service()->fastForward(kForceRefresh);
- refreshFuture.wait();
+ cache()->refreshNow(client());
}
// Test larger mixed sets of cache/service active sessions
@@ -461,77 +416,44 @@ TEST_F(LogicalSessionCacheTest, ManySessionsRefreshComboDeluxe) {
auto lsid = makeLogicalSessionIdForTest();
service()->add(lsid);
- auto lsid2 = makeLogicalSessionIdForTest();
- cache()->startSession(lsid2).transitional_ignore();
+ auto record2 = makeLogicalSessionRecordForTest();
+ cache()->startSession(opCtx(), record2).transitional_ignore();
}
- stdx::mutex mutex;
- stdx::condition_variable cv;
- int refreshes = 0;
int nRefreshed = 0;
// Check that all lsids refresh successfully
- sessions()->setRefreshHook(
- [&refreshes, &mutex, &cv, &nRefreshed](LogicalSessionIdSet sessions) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- refreshes++;
- nRefreshed = sessions.size();
- }
- cv.notify_all();
-
- return LogicalSessionIdSet{};
- });
+ sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) {
+ nRefreshed = sessions.size();
+ return Status::OK();
+ });
// Force a refresh
+ clearOpCtx();
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&refreshes] { return refreshes == 1; });
- }
+ cache()->refreshNow(client());
ASSERT_EQ(nRefreshed, count * 2);
// Remove all of the service sessions, should just refresh the cache entries
- // (and make all but one fail to refresh)
service()->clear();
- sessions()->setRefreshHook(
- [&refreshes, &mutex, &cv, &nRefreshed](LogicalSessionIdSet sessions) {
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- refreshes++;
- nRefreshed = sessions.size();
- }
- cv.notify_all();
-
- sessions.erase(sessions.begin());
- return sessions;
- });
-
- // Wait for job to be scheduled
- waitUntilRefreshScheduled();
+ sessions()->setRefreshHook([&nRefreshed](const LogicalSessionRecordSet& sessions) {
+ nRefreshed = sessions.size();
+ return Status::OK();
+ });
// Force another refresh
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&refreshes] { return refreshes == 2; });
- }
+ cache()->refreshNow(client());
// We should not have refreshed any sessions from the service, only the cache
ASSERT_EQ(nRefreshed, count);
- // Wait for job to be scheduled
- waitUntilRefreshScheduled();
-
// Force a third refresh
service()->fastForward(kForceRefresh);
- {
- stdx::unique_lock<stdx::mutex> lk(mutex);
- cv.wait(lk, [&refreshes] { return refreshes == 3; });
- }
+ cache()->refreshNow(client());
- // Since all but one lsid failed to refresh, third set should just have one lsid
- ASSERT_EQ(nRefreshed, 1);
+ // Again, we should have only refreshed sessions from the cache
+ ASSERT_EQ(nRefreshed, count);
}
} // namespace