diff options
author | Cheahuychou Mao <mao.cheahuychou@gmail.com> | 2021-07-30 06:51:46 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2021-08-14 19:06:09 +0000 |
commit | 632105d33ab6931e86626db08160800d41bb1329 (patch) | |
tree | f83fb20871b79b90dda37130489745223cefa350 /src/mongo/db/session_catalog_test.cpp | |
parent | dcfb71ae6c7e0f5b2809a4e04bb534a1d6b0ce12 (diff) | |
download | mongo-632105d33ab6931e86626db08160800d41bb1329.tar.gz |
SERVER-58751 Support internal sessions
Diffstat (limited to 'src/mongo/db/session_catalog_test.cpp')
-rw-r--r-- | src/mongo/db/session_catalog_test.cpp | 765 |
1 files changed, 522 insertions, 243 deletions
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index a83e6d4c240..a89ee2561c1 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -27,16 +27,21 @@ * it in the license file. */ +#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault + #include "mongo/platform/basic.h" #include <memory> +#include "mongo/db/cancelable_operation_context.h" #include "mongo/db/service_context_test_fixture.h" #include "mongo/db/session_catalog.h" +#include "mongo/logv2/log.h" #include "mongo/stdx/future.h" #include "mongo/unittest/barrier.h" #include "mongo/unittest/death_test.h" #include "mongo/unittest/unittest.h" +#include "mongo/util/concurrency/thread_pool.h" #include "mongo/util/scopeguard.h" namespace mongo { @@ -81,6 +86,107 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, CheckoutAndReleaseSession) { ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); } +TEST_F(SessionCatalogTestWithDefaultOpCtx, CheckoutAndReleaseSessionWithTxnNumber) { + auto parentLsid = makeLogicalSessionIdForTest(); + auto childLsid = makeLogicalSessionIdWithTxnNumberForTest(parentLsid); + _opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(_opCtx); + + auto session = OperationContextSession::get(_opCtx); + ASSERT(session); + ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, CheckoutAndReleaseSessionWithTxnUUID) { + auto parentLsid = makeLogicalSessionIdForTest(); + auto childLsid = makeLogicalSessionIdWithTxnNumberForTest(parentLsid); + _opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(_opCtx); + + auto session = OperationContextSession::get(_opCtx); + ASSERT(session); + ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, CannotCheckOutParentSessionOfCheckedOutSession) { + auto runTest = [&](const LogicalSessionId& parentLsid, const LogicalSessionId& childLsid) { + _opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(_opCtx); + + // Verify that the parent session cannot be checked out until the child session is checked + // back in. + auto future = stdx::async(stdx::launch::async, [this, parentLsid] { + ThreadClient tc(getServiceContext()); + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(parentLsid); + OperationContextSession ocs(opCtx.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); + future.get(); + }; + + auto parentLsid = makeLogicalSessionIdForTest(); + runTest(parentLsid, makeLogicalSessionIdWithTxnNumberForTest(parentLsid)); + runTest(parentLsid, makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, CannotCheckOutChildSessionOfCheckedOutSession) { + auto runTest = [&](const LogicalSessionId& parentLsid, const LogicalSessionId& childLsid) { + _opCtx->setLogicalSessionId(parentLsid); + OperationContextSession ocs(_opCtx); + + // Verify that the child session cannot be checked out until the parent session is checked + // back in. + auto future = stdx::async(stdx::launch::async, [this, childLsid] { + ThreadClient tc(getServiceContext()); + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(opCtx.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); + future.get(); + }; + + auto parentLsid = makeLogicalSessionIdForTest(); + runTest(parentLsid, makeLogicalSessionIdWithTxnNumberForTest(parentLsid)); + runTest(parentLsid, makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); +} + +TEST_F(SessionCatalogTestWithDefaultOpCtx, CannotCheckoutMultipleChildSessionsConcurrently) { + auto runTest = [&](const LogicalSessionId& childLsid0, const LogicalSessionId& childLsid1) { + _opCtx->setLogicalSessionId(childLsid0); + OperationContextSession ocs(_opCtx); + + // Verify that another child session cannot be checked out until both the child session + // above and the parent session are checked back in. + auto future = stdx::async(stdx::launch::async, [this, childLsid1] { + ThreadClient tc(getServiceContext()); + auto childSessionOpCtx1 = cc().makeOperationContext(); + childSessionOpCtx1->setLogicalSessionId(childLsid1); + OperationContextSession ocs(childSessionOpCtx1.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); + future.get(); + }; + + auto parentLsid = makeLogicalSessionIdForTest(); + runTest(makeLogicalSessionIdWithTxnNumberForTest(parentLsid, 0), + makeLogicalSessionIdWithTxnNumberForTest(parentLsid, 1)); + runTest(makeLogicalSessionIdWithTxnUUIDForTest(parentLsid), + makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); + runTest(makeLogicalSessionIdWithTxnNumberForTest(parentLsid), + makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); +} + TEST_F(SessionCatalogTestWithDefaultOpCtx, OperationContextCheckedOutSession) { _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); const TxnNumber txnNum = 20; @@ -93,36 +199,46 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, OperationContextCheckedOutSession) { } TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) { - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); - - { - OperationContextSession outerScopedSession(_opCtx); + auto runTest = [&](const LogicalSessionId& lsid) { + _opCtx->setLogicalSessionId(lsid); { - DirectClientSetter inDirectClient(_opCtx); - OperationContextSession innerScopedSession(_opCtx); - - auto session = OperationContextSession::get(_opCtx); - ASSERT(session); - ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); + OperationContextSession outerScopedSession(_opCtx); + + { + DirectClientSetter inDirectClient(_opCtx); + OperationContextSession innerScopedSession(_opCtx); + + auto session = OperationContextSession::get(_opCtx); + ASSERT(session); + ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); + } + + { + DirectClientSetter inDirectClient(_opCtx); + auto session = OperationContextSession::get(_opCtx); + ASSERT(session); + ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); + } } - { - DirectClientSetter inDirectClient(_opCtx); - auto session = OperationContextSession::get(_opCtx); - ASSERT(session); - ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); - } - } + ASSERT(!OperationContextSession::get(_opCtx)); + }; - ASSERT(!OperationContextSession::get(_opCtx)); + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTest, ScanSession) { - // Create three sessions in the catalog. - const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector<LogicalSessionId> { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(lsid1); + auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); + return {lsid0, lsid1, lsid2, lsid3}; + }(); for (const auto& lsid : lsids) { stdx::async(stdx::launch::async, [this, lsid] { @@ -146,16 +262,24 @@ TEST_F(SessionCatalogTest, ScanSession) { ASSERT_EQ(lsids[2], session.get()->getSessionId()); }); + catalog()->scanSession(lsids[3], [&lsids](const ObservableSession& session) { + ASSERT_EQ(lsids[3], session.get()->getSessionId()); + }); + catalog()->scanSession(makeLogicalSessionIdForTest(), [](const ObservableSession&) { FAIL("The callback was called for non-existent session"); }); } TEST_F(SessionCatalogTest, ScanSessionMarkForReapWhenSessionIsIdle) { - // Create three sessions in the catalog. - const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector<LogicalSessionId> { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(lsid1); + auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); + return {lsid0, lsid1, lsid2, lsid3}; + }(); for (const auto& lsid : lsids) { stdx::async(stdx::launch::async, [this, lsid] { @@ -178,8 +302,15 @@ TEST_F(SessionCatalogTest, ScanSessionMarkForReapWhenSessionIsIdle) { ASSERT_EQ(lsids[1], session.get()->getSessionId()); }); - catalog()->scanSession(lsids[2], [&lsids](const ObservableSession& session) { - ASSERT_EQ(lsids[2], session.get()->getSessionId()); + catalog()->scanSession(lsids[2], + [&lsids](ObservableSession& session) { session.markForReap(); }); + + catalog()->scanSession(lsids[2], [](const ObservableSession&) { + FAIL("The callback was called for non-existent session"); + }); + + catalog()->scanSession(lsids[3], [&lsids](const ObservableSession& session) { + ASSERT_EQ(lsids[3], session.get()->getSessionId()); }); } @@ -196,10 +327,14 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { ASSERT(lsidsFound.empty()); lsidsFound.clear(); - // Create three sessions in the catalog. - const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector<LogicalSessionId> { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(lsid1); + auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); + return {lsid0, lsid1, lsid2, lsid3}; + }(); for (const auto& lsid : lsids) { stdx::async(stdx::launch::async, [this, lsid] { @@ -213,7 +348,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { // Scan over all Sessions. catalog()->scanSessions(matcherAllSessions, workerFn); - ASSERT_EQ(3U, lsidsFound.size()); + ASSERT_EQ(4U, lsidsFound.size()); lsidsFound.clear(); // Scan over all Sessions, visiting a particular Session. @@ -223,13 +358,30 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { ASSERT_EQ(1U, lsidsFound.size()); ASSERT_EQ(lsids[2], lsidsFound.front()); lsidsFound.clear(); + + // Scan over all Sessions, visiting a Session with child Sessions. + SessionKiller::Matcher matcherLSID1( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx, lsids[1])}); + catalog()->scanSessions(matcherLSID1, workerFn); + ASSERT_EQ(1U, lsidsFound.size()); + ASSERT_EQ(lsids[1], lsidsFound.front()); + // TODO (SERVER-58755): Make the SessionKiller::Matcher also return the child sessions. + // ASSERT_EQ(3U, lsidsFound.size()); + // ASSERT_EQ(lsids[1], lsidsFound[0]); + // ASSERT_EQ(lsids[2], lsidsFound[1]); + // ASSERT_EQ(lsids[3], lsidsFound[2]); + lsidsFound.clear(); } TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsMarkForReap) { - // Create three sessions in the catalog. - const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector<LogicalSessionId> { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(lsid1); + auto lsid3 = makeLogicalSessionIdWithTxnUUIDForTest(lsid1); + return {lsid0, lsid1, lsid2, lsid3}; + }(); unittest::Barrier sessionsCheckedOut(2); unittest::Barrier sessionsCheckedIn(2); @@ -244,7 +396,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsMarkForReap) { }); // After this wait, session 1 is checked-out and waiting on the barrier, because of which only - // sessions 0 and 2 will be reaped + // sessions 0, 2 and 3 will be reaped. sessionsCheckedOut.countDownAndWait(); SessionKiller::Matcher matcherAllSessions( @@ -267,177 +419,278 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessionsMarkForReap) { } TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) { - const auto lsid = makeLogicalSessionIdForTest(); + auto runTest = [&](const LogicalSessionId& lsid) { + // Create the session so there is something to kill + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + } - // Create the session so there is something to kill - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(opCtx.get()); - } + auto killToken = catalog()->killSession(lsid); - auto killToken = catalog()->killSession(lsid); + // Make sure that regular session check-out will fail because the session is marked as + // killed + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } - // Make sure that regular session check-out will fail because the session is marked as killed - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); - ASSERT_THROWS_CODE( - OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired); - } + // Schedule a separate "regular operation" thread, which will block on checking-out the + // session, which we will use to confirm that session kill completion actually unblocks + // check-out + auto future = stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(sideOpCtx.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); - // Schedule a separate "regular operation" thread, which will block on checking-out the session, - // which we will use to confirm that session kill completion actually unblocks check-out - auto future = stdx::async(stdx::launch::async, [this, lsid] { - ThreadClient tc(getServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(sideOpCtx.get()); - }); - ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + // Make sure that "for kill" session check-out succeeds + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } - // Make sure that "for kill" session check-out succeeds - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - } + // Make sure that session check-out after kill succeeds again + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + } - // Make sure that session check-out after kill succeeds again - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(opCtx.get()); - } + // Make sure the "regular operation" eventually is able to proceed and use the just killed + // session + future.get(); + }; - // Make sure the "regular operation" eventually is able to proceed and use the just killed - // session - future.get(); + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) { - const auto lsid = makeLogicalSessionIdForTest(); + auto runTest = [&](const LogicalSessionId& lsid) { + auto killToken = [this, &lsid] { + // Create the session so there is something to kill + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession operationContextSession(opCtx.get()); - auto killToken = [this, &lsid] { - // Create the session so there is something to kill - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession operationContextSession(opCtx.get()); + auto killToken = catalog()->killSession(lsid); - auto killToken = catalog()->killSession(lsid); + // Make sure the owning operation context is interrupted + ASSERT_THROWS_CODE( + opCtx->checkForInterrupt(), AssertionException, ErrorCodes::Interrupted); - // Make sure the owning operation context is interrupted - ASSERT_THROWS_CODE(opCtx->checkForInterrupt(), AssertionException, ErrorCodes::Interrupted); + // Make sure that the checkOutForKill call will wait for the owning operation context to + // check the session back in + auto future = stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + OperationContextSession ocs(sideOpCtx.get()); + }); + + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired); - // Make sure that the checkOutForKill call will wait for the owning operation context to - // check the session back in + return killToken; + }(); + + // Make sure that regular session check-out will fail because the session is marked as + // killed + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } + + // Schedule a separate "regular operation" thread, which will block on checking-out the + // session, which we will use to confirm that session kill completion actually unblocks + // check-out auto future = stdx::async(stdx::launch::async, [this, lsid] { ThreadClient tc(getServiceContext()); auto sideOpCtx = Client::getCurrent()->makeOperationContext(); sideOpCtx->setLogicalSessionId(lsid); - sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); OperationContextSession ocs(sideOpCtx.get()); }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + // Make sure that "for kill" session check-out succeeds + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } - ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired); + // Make sure that session check-out after kill succeeds again + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + } - return killToken; - }(); + // Make sure the "regular operation" eventually is able to proceed and use the just killed + // session + future.get(); + }; - // Make sure that regular session check-out will fail because the session is marked as killed - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); - ASSERT_THROWS_CODE( - OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired); - } + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); +} - // Schedule a separate "regular operation" thread, which will block on checking-out the session, - // which we will use to confirm that session kill completion actually unblocks check-out - auto future = stdx::async(stdx::launch::async, [this, lsid] { - ThreadClient tc(getServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(sideOpCtx.get()); - }); - ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); +TEST_F(SessionCatalogTest, KillParentSessionWhenChildSessionIsCheckedOut) { + auto runTest = [&](const LogicalSessionId& parentLsid, const LogicalSessionId& childLsid) { + auto killToken = [this, &parentLsid, &childLsid] { + // Create the session so there is something to kill + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(childLsid); + OperationContextSession operationContextSession(opCtx.get()); - // Make sure that "for kill" session check-out succeeds - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - } + auto killToken = catalog()->killSession(parentLsid); - // Make sure that session check-out after kill succeeds again - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(opCtx.get()); - } + // Make sure the owning operation context is interrupted + ASSERT_THROWS_CODE( + opCtx->checkForInterrupt(), AssertionException, ErrorCodes::Interrupted); - // Make sure the "regular operation" eventually is able to proceed and use the just killed - // session - future.get(); + // Make sure that the checkOutForKill call will wait for the owning operation context to + // check the session back in + auto future = stdx::async(stdx::launch::async, [this, childLsid] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(childLsid); + sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + OperationContextSession ocs(sideOpCtx.get()); + }); + + ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired); + + return killToken; + }(); + + // Make sure that regular session check-out will fail because the session is marked as + // killed + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(childLsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } + + // Schedule a separate "regular operation" thread, which will block on checking-out the + // session, which we will use to confirm that session kill completion actually unblocks + // check-out + auto future = stdx::async(stdx::launch::async, [this, childLsid] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(sideOpCtx.get()); + }); + ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); + + // Make sure that "for kill" session check-out succeeds + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } + + // Make sure that session check-out after kill succeeds again + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(childLsid); + OperationContextSession ocs(opCtx.get()); + } + + // Make sure the "regular operation" eventually is able to proceed and use the just killed + // session + future.get(); + }; + + auto parentLsid = makeLogicalSessionIdForTest(); + runTest(parentLsid, makeLogicalSessionIdWithTxnNumberForTest(parentLsid)); + runTest(parentLsid, makeLogicalSessionIdWithTxnUUIDForTest(parentLsid)); } TEST_F(SessionCatalogTest, MarkSessionAsKilledCanBeCalledMoreThanOnce) { - const auto lsid = makeLogicalSessionIdForTest(); + auto runTest = [&](const LogicalSessionId& lsid) { + // Create the session so there is something to kill + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + OperationContextSession ocs(opCtx.get()); + } - // Create the session so there is something to kill - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession ocs(opCtx.get()); - } + auto killToken1 = catalog()->killSession(lsid); + auto killToken2 = catalog()->killSession(lsid); - auto killToken1 = catalog()->killSession(lsid); - auto killToken2 = catalog()->killSession(lsid); + // Make sure that regular session check-out will fail because there are two killers on the + // session + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } - // Make sure that regular session check-out will fail because there are two killers on the - // session - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); - ASSERT_THROWS_CODE( - OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired); - } + boost::optional<SessionCatalog::KillToken> killTokenWhileSessionIsCheckedOutForKill; - boost::optional<SessionCatalog::KillToken> killTokenWhileSessionIsCheckedOutForKill; + // Finish the first killer of the session + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken1)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - // Finish the first killer of the session - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken1)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + // Killing a session while checked out for kill should not affect the killers + killTokenWhileSessionIsCheckedOutForKill.emplace(catalog()->killSession(lsid)); + } - // Killing a session while checked out for kill should not affect the killers - killTokenWhileSessionIsCheckedOutForKill.emplace(catalog()->killSession(lsid)); - } + // Regular session check-out should still fail because there are now still two killers on + // the session + { + auto opCtx = makeOperationContext(); + opCtx->setLogicalSessionId(lsid); + opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); + ASSERT_THROWS_CODE(OperationContextSession(opCtx.get()), + AssertionException, + ErrorCodes::MaxTimeMSExpired); + } + { + auto opCtx = makeOperationContext(); + auto scopedSession = + catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken2)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } + { + auto opCtx = makeOperationContext(); + auto scopedSession = catalog()->checkOutSessionForKill( + opCtx.get(), std::move(*killTokenWhileSessionIsCheckedOutForKill)); + ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); + } + }; - // Regular session check-out should still fail because there are now still two killers on the - // session - { - auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - opCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); - ASSERT_THROWS_CODE( - OperationContextSession(opCtx.get()), AssertionException, ErrorCodes::MaxTimeMSExpired); - } - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill(opCtx.get(), std::move(killToken2)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - } - { - auto opCtx = makeOperationContext(); - auto scopedSession = catalog()->checkOutSessionForKill( - opCtx.get(), std::move(*killTokenWhileSessionIsCheckedOutForKill)); - ASSERT_EQ(opCtx.get(), scopedSession.currentOperation_forTest()); - } + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTest, MarkSessionsAsKilledWhenSessionDoesNotExist) { @@ -447,41 +700,56 @@ TEST_F(SessionCatalogTest, MarkSessionsAsKilledWhenSessionDoesNotExist) { } TEST_F(SessionCatalogTestWithDefaultOpCtx, SessionDiscarOperationContextAfterCheckIn) { - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + auto runTest = [&](const LogicalSessionId& lsid) { + _opCtx->setLogicalSessionId(lsid); - { - OperationContextSession ocs(_opCtx); - ASSERT(OperationContextSession::get(_opCtx)); + { + OperationContextSession ocs(_opCtx); + ASSERT(OperationContextSession::get(_opCtx)); + + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); + } - OperationContextSession::checkIn(_opCtx); ASSERT(!OperationContextSession::get(_opCtx)); - } + }; - ASSERT(!OperationContextSession::get(_opCtx)); + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTestWithDefaultOpCtx, SessionDiscarOperationContextAfterCheckInCheckOut) { - _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); + auto runTest = [&](const LogicalSessionId& lsid) { + _opCtx->setLogicalSessionId(lsid); - { - OperationContextSession ocs(_opCtx); - ASSERT(OperationContextSession::get(_opCtx)); + { + OperationContextSession ocs(_opCtx); + ASSERT(OperationContextSession::get(_opCtx)); - OperationContextSession::checkIn(_opCtx); - ASSERT(!OperationContextSession::get(_opCtx)); + OperationContextSession::checkIn(_opCtx); + ASSERT(!OperationContextSession::get(_opCtx)); - OperationContextSession::checkOut(_opCtx); - ASSERT(OperationContextSession::get(_opCtx)); - } + OperationContextSession::checkOut(_opCtx); + ASSERT(OperationContextSession::get(_opCtx)); + } - ASSERT(!OperationContextSession::get(_opCtx)); + ASSERT(!OperationContextSession::get(_opCtx)); + }; + + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) { - // Create three sessions - const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest(), - makeLogicalSessionIdForTest()}; + // Create sessions in the catalog. + const auto lsids = []() -> std::vector<LogicalSessionId> { + auto lsid0 = makeLogicalSessionIdForTest(); + auto lsid1 = makeLogicalSessionIdWithTxnNumberForTest(); + auto lsid2 = makeLogicalSessionIdWithTxnNumberForTest(); + return {lsid0, lsid1, lsid2}; + }(); std::vector<stdx::future<void>> futures; unittest::Barrier firstUseOfTheSessionReachedBarrier(lsids.size() + 1); @@ -556,62 +824,73 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) { // Even if the implementaion has a bug, the test may not always fail depending on thread // scheduling, however, this test case still gives us a good coverage. TEST_F(SessionCatalogTestWithDefaultOpCtx, ConcurrentCheckOutAndKill) { - auto lsid = makeLogicalSessionIdForTest(); - _opCtx->setLogicalSessionId(lsid); - - stdx::future<void> normalCheckOutFinish, killCheckOutFinish; + auto runTest = [&](const LogicalSessionId& lsid) { + auto client = getServiceContext()->makeClient("ConcurrentCheckOutAndKill"); + AlternativeClientRegion acr(client); + auto opCtx = cc().makeOperationContext(); + opCtx->setLogicalSessionId(lsid); - // This variable is protected by the session check-out. - std::string lastSessionCheckOut = "first session"; - { - // Check out the session to block both normal check-out and checkOutForKill. - OperationContextSession firstCheckOut(_opCtx); + stdx::future<void> normalCheckOutFinish, killCheckOutFinish; - // Normal check out should start after kill. - normalCheckOutFinish = stdx::async(stdx::launch::async, [&] { - ThreadClient tc(getServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); - OperationContextSession normalCheckOut(sideOpCtx.get()); - ASSERT_EQ("session kill", lastSessionCheckOut); - lastSessionCheckOut = "session checkout"; - }); + // This variable is protected by the session check-out. + std::string lastSessionCheckOut = "first session"; + { + // Check out the session to block both normal check-out and checkOutForKill. + OperationContextSession firstCheckOut(opCtx.get()); - // Kill will short-cut the queue and be the next one to check out. - killCheckOutFinish = stdx::async(stdx::launch::async, [&] { - ThreadClient tc(getServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); + // Normal check out should start after kill. + normalCheckOutFinish = stdx::async(stdx::launch::async, [&] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + OperationContextSession normalCheckOut(sideOpCtx.get()); + ASSERT_EQ("session kill", lastSessionCheckOut); + lastSessionCheckOut = "session checkout"; + }); - // Kill the session - std::vector<SessionCatalog::KillToken> killTokens; - catalog()->scanSessions( - SessionKiller::Matcher( - KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(sideOpCtx.get())}), - [&killTokens](const ObservableSession& session) { - killTokens.emplace_back(session.kill(ErrorCodes::InternalError)); - }); - ASSERT_EQ(1U, killTokens.size()); - auto checkOutSessionForKill( - catalog()->checkOutSessionForKill(sideOpCtx.get(), std::move(killTokens[0]))); - ASSERT_EQ("first session", lastSessionCheckOut); - lastSessionCheckOut = "session kill"; - }); + // Kill will short-cut the queue and be the next one to check out. + killCheckOutFinish = stdx::async(stdx::launch::async, [&] { + ThreadClient tc(getServiceContext()); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + sideOpCtx->setLogicalSessionId(lsid); + + // Kill the session + std::vector<SessionCatalog::KillToken> killTokens; + catalog()->scanSessions( + SessionKiller::Matcher( + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx, lsid)}), + [&killTokens](const ObservableSession& session) { + killTokens.emplace_back(session.kill(ErrorCodes::InternalError)); + }); + + ASSERT_EQ(1U, killTokens.size()); + auto checkOutSessionForKill( + catalog()->checkOutSessionForKill(sideOpCtx.get(), std::move(killTokens[0]))); + + ASSERT_EQ("first session", lastSessionCheckOut); + lastSessionCheckOut = "session kill"; + }); - // The main thread won't check in the session until it's killed. - { - auto m = MONGO_MAKE_LATCH(); - stdx::condition_variable cond; - stdx::unique_lock<Latch> lock(m); - ASSERT_THROWS_CODE( - _opCtx->waitForConditionOrInterrupt(cond, lock, [] { return false; }), - DBException, - ErrorCodes::InternalError); + // The main thread won't check in the session until it's killed. + { + auto m = MONGO_MAKE_LATCH(); + stdx::condition_variable cond; + stdx::unique_lock<Latch> lock(m); + ASSERT_THROWS_CODE( + opCtx->waitForConditionOrInterrupt(cond, lock, [] { return false; }), + DBException, + ErrorCodes::InternalError); + } } - } - normalCheckOutFinish.get(); - killCheckOutFinish.get(); - ASSERT_EQ("session checkout", lastSessionCheckOut); + normalCheckOutFinish.get(); + killCheckOutFinish.get(); + + ASSERT_EQ("session checkout", lastSessionCheckOut); + }; + + runTest(makeLogicalSessionIdForTest()); + runTest(makeLogicalSessionIdWithTxnNumberForTest()); + runTest(makeLogicalSessionIdWithTxnUUIDForTest()); } } // namespace |