diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-11-29 09:19:26 -0500 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2018-12-06 07:54:00 -0500 |
commit | e5967d7b430fb8eb8a2e56ab9bdaa946e7bbdfa8 (patch) | |
tree | 204072ee4aae802cb603f14067d4d7a774c0c535 /src/mongo/db | |
parent | 55ade18c488a08d160a3341799b9ea276f262d08 (diff) | |
download | mongo-e5967d7b430fb8eb8a2e56ab9bdaa946e7bbdfa8.tar.gz |
SERVER-37923 Make chunk migration check-out the session
With this change there are no callers which modify the session state
without having checked it out first.
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 21 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination_test.cpp | 10 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.cpp | 36 | ||||
-rw-r--r-- | src/mongo/db/session_catalog.h | 81 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_test.cpp | 118 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 207 |
7 files changed, 99 insertions, 376 deletions
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index afd07c8ac34..cb0b8c620c4 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -519,21 +519,17 @@ public: void setUp() override { OpObserverTest::setUp(); _opCtx = cc().makeOperationContext(); + _opObserver.emplace(); MongoDSessionCatalog::onStepUp(opCtx()); - - // Create a session. - auto sessionCatalog = SessionCatalog::get(getServiceContext()); - auto sessionId = makeLogicalSessionIdForTest(); - _session = sessionCatalog->getOrCreateSession(opCtx(), sessionId); - _times.emplace(opCtx()); - opCtx()->setLogicalSessionId(session()->getSessionId()); - opCtx()->setTxnNumber(txnNum()); + opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); + opCtx()->setTxnNumber(txnNum()); _sessionCheckout = std::make_unique<MongoDOperationContextSession>(opCtx()); - auto txnParticipant = TransactionParticipant::get(opCtx()); + + const auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, true); } @@ -592,7 +588,7 @@ protected: } Session* session() { - return _session->get(); + return OperationContextSession::get(opCtx()); } OpObserverImpl& opObserver() { @@ -613,10 +609,11 @@ private: typedef OpObserver::ReservedTimes ReservedTimes; }; - boost::optional<OpObserverImpl> _opObserver; - boost::optional<ScopedSession> _session; ServiceContext::UniqueOperationContext _opCtx; + + boost::optional<OpObserverImpl> _opObserver; boost::optional<ExposeOpObserverTimes::ReservedTimes> _times; + std::unique_ptr<MongoDOperationContextSession> _sessionCheckout; TxnNumber _txnNum = 0; }; diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index b7d04fcbfcb..4d2a9c32e97 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -247,7 +247,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, const auto stmtId = *oplogEntry.getStatementId(); - auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId); + auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, result.sessionId); auto const txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get()); txnParticipant->refreshFromStorageIfNeeded(opCtx); diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 1f52e217372..ef641fce877 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -164,10 +164,10 @@ public: return _migrationId.value(); } - ScopedSession getSessionWithTxn(OperationContext* opCtx, - const LogicalSessionId& sessionId, - const TxnNumber& txnNum) { - auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId); + ScopedCheckedOutSession getSessionWithTxn(OperationContext* opCtx, + const LogicalSessionId& sessionId, + const TxnNumber& txnNum) { + auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, sessionId); const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get()); txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -1590,7 +1590,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, // in TransactionTooOld. This should not preclude the entries for session 2 from getting // applied. auto scopedSession = - SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, *sessionInfo1.getSessionId()); + SessionCatalog::get(opCtx)->checkOutSession(opCtx, *sessionInfo1.getSessionId()); const auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get()); txnParticipant->refreshFromStorageIfNeeded(opCtx); diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp index 94ee9fa940b..a708c290acf 100644 --- a/src/mongo/db/session_catalog.cpp +++ b/src/mongo/db/session_catalog.cpp @@ -73,15 +73,18 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) { } ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) { + invariant(opCtx->getLogicalSessionId()); + return checkOutSession(opCtx, *opCtx->getLogicalSessionId()); +} + +ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx, + const LogicalSessionId& lsid) { // This method is not supposed to be called with an already checked-out session due to risk of // deadlock invariant(!operationSessionDecoration(opCtx)); invariant(!opCtx->lockState()->inAWriteUnitOfWork()); invariant(!opCtx->lockState()->isLocked()); - invariant(opCtx->getLogicalSessionId()); - const auto lsid = *opCtx->getLogicalSessionId(); - stdx::unique_lock<stdx::mutex> ul(_mutex); auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid); @@ -94,7 +97,7 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) sri->session._markCheckedOut(ul, opCtx); return ScopedCheckedOutSession( - opCtx, ScopedSession(std::move(sri)), boost::none /* Not checked out for kill */); + *this, std::move(sri), boost::none /* Not checked out for kill */); } ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* opCtx, @@ -118,21 +121,7 @@ ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* sri->session._markCheckedOut(ul, opCtx); return ScopedCheckedOutSession( - opCtx, ScopedSession(std::move(sri)), std::move(killToken) /* Checked out for kill */); -} - -ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx, - const LogicalSessionId& lsid) { - invariant(!opCtx->lockState()->isLocked()); - invariant(!opCtx->getLogicalSessionId()); - invariant(!opCtx->getTxnNumber()); - - auto ss = [&] { - stdx::unique_lock<stdx::mutex> ul(_mutex); - return ScopedSession(_getOrCreateSessionRuntimeInfo(ul, opCtx, lsid)); - }(); - - return ss; + *this, std::move(sri), std::move(killToken) /* Checked out for kill */); } void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher, @@ -167,14 +156,13 @@ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreate return it->second; } -void SessionCatalog::_releaseSession(const LogicalSessionId& lsid, +void SessionCatalog::_releaseSession(std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri, boost::optional<Session::KillToken> killToken) { stdx::lock_guard<stdx::mutex> lg(_mutex); - auto it = _sessions.find(lsid); - invariant(it != _sessions.end()); - - auto& sri = it->second; + // Make sure we have exactly the same session on the map and that it is still associated with an + // operation context (meaning checked-out) + invariant(_sessions[sri->session.getSessionId()] == sri); invariant(sri->session.currentOperation()); sri->session._markCheckedIn(lg); diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h index 62eeca00acc..1ead5cad736 100644 --- a/src/mongo/db/session_catalog.h +++ b/src/mongo/db/session_catalog.h @@ -75,9 +75,12 @@ public: void reset_forTest(); /** - * Potentially blocking call, which uses the session information stored in the specified - * operation context and either creates a brand new session object (if one doesn't exist) or - * "checks-out" the existing one (if it is not currently in use or marked for kill). + * Potentially blocking call, which either creates a brand new session object (if one doesn't + * exist) or "checks-out" the existing one (if it is not currently in use or marked for kill). + * + * The 'opCtx'-only variant uses the session information stored on the operation context and the + * variant, which has the 'lsid' parameter checks-out that session id. Neither of these methods + * can be called with an already checked-out session. * * Checking out a session puts it in the 'checked out' state and all subsequent calls to * checkout will block until it is checked back in. This happens when the returned object goes @@ -86,6 +89,7 @@ public: * Throws exception on errors. */ ScopedCheckedOutSession checkOutSession(OperationContext* opCtx); + ScopedCheckedOutSession checkOutSession(OperationContext* opCtx, const LogicalSessionId& lsid); /** * See the description of 'Session::kill' for more information on the session kill usage @@ -95,17 +99,6 @@ public: Session::KillToken killToken); /** - * Returns a reference to the specified cached session regardless of whether it is checked-out - * or not. The returned session is not returned checked-out and is allowed to be checked-out - * concurrently. - * - * The intended usage for this method is to allow migrations to run in parallel with writes for - * the same session without blocking it. Because of this, it may not be used from operations - * which run on a session. - */ - ScopedSession getOrCreateSession(OperationContext* opCtx, const LogicalSessionId& lsid); - - /** * Iterates through the SessionCatalog under the SessionCatalog mutex and applies 'workerFn' to * each Session which matches the specified 'matcher'. * @@ -150,7 +143,7 @@ private: /** * Makes a session, previously checked out through 'checkoutSession', available again. */ - void _releaseSession(const LogicalSessionId& lsid, + void _releaseSession(std::shared_ptr<SessionRuntimeInfo> sri, boost::optional<Session::KillToken> killToken); stdx::mutex _mutex; @@ -160,43 +153,14 @@ private: }; /** - * Scoped object representing a reference to a session. - */ -class ScopedSession { -public: - explicit ScopedSession(std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri) - : _sri(std::move(sri)) { - invariant(_sri); - } - - Session* get() const { - return &_sri->session; - } - - Session* operator->() const { - return get(); - } - - Session& operator*() const { - return *get(); - } - - operator bool() const { - return !!_sri; - } - -private: - std::shared_ptr<SessionCatalog::SessionRuntimeInfo> _sri; -}; - -/** * Scoped object representing a checked-out session. See comments for the 'checkoutSession' method * for more information on its behaviour. */ class ScopedCheckedOutSession { MONGO_DISALLOW_COPYING(ScopedCheckedOutSession); - friend ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext*); + friend ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext*, + const LogicalSessionId&); friend ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext*, Session::KillToken); @@ -204,14 +168,13 @@ public: ScopedCheckedOutSession(ScopedCheckedOutSession&&) = default; ~ScopedCheckedOutSession() { - if (_scopedSession) { - SessionCatalog::get(_opCtx)->_releaseSession(_scopedSession->getSessionId(), - std::move(_killToken)); + if (_sri) { + _catalog._releaseSession(std::move(_sri), std::move(_killToken)); } } Session* get() const { - return _scopedSession.get(); + return &_sri->session; } Session* operator->() const { @@ -223,22 +186,22 @@ public: } operator bool() const { - return _scopedSession; + return bool(_sri); } private: - ScopedCheckedOutSession(OperationContext* opCtx, - ScopedSession scopedSession, + ScopedCheckedOutSession(SessionCatalog& catalog, + std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri, boost::optional<Session::KillToken> killToken) - : _opCtx(opCtx), - _killToken(std::move(killToken)), - _scopedSession(std::move(scopedSession)) {} + : _catalog(catalog), _sri(std::move(sri)), _killToken(std::move(killToken)) {} - OperationContext* const _opCtx; + // The owning session catalog into which the session should be checked back + SessionCatalog& _catalog; - boost::optional<Session::KillToken> _killToken; + std::shared_ptr<SessionCatalog::SessionRuntimeInfo> _sri; - ScopedSession _scopedSession; + // Only set if the session was obtained though checkOutSessionForKill + boost::optional<Session::KillToken> _killToken; }; /** diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp index 66d4ae6d2be..48431675529 100644 --- a/src/mongo/db/session_catalog_test.cpp +++ b/src/mongo/db/session_catalog_test.cpp @@ -92,44 +92,6 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, OperationContextCheckedOutSession) { ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId()); } -TEST_F(SessionCatalogTestWithDefaultOpCtx, GetOrCreateNonExistentSession) { - const auto lsid = makeLogicalSessionIdForTest(); - auto scopedSession = catalog()->getOrCreateSession(_opCtx, lsid); - - ASSERT(scopedSession.get()); - ASSERT_EQ(lsid, scopedSession->getSessionId()); -} - -TEST_F(SessionCatalogTestWithDefaultOpCtx, GetOrCreateSessionAfterCheckOutSession) { - const auto lsid = makeLogicalSessionIdForTest(); - _opCtx->setLogicalSessionId(lsid); - - boost::optional<OperationContextSession> ocs; - ocs.emplace(_opCtx); - - stdx::async(stdx::launch::async, [&] { - ThreadClient tc(getGlobalServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - auto scopedSession = - SessionCatalog::get(sideOpCtx.get())->getOrCreateSession(sideOpCtx.get(), lsid); - - ASSERT(scopedSession.get()); - ASSERT_EQ(lsid, scopedSession->getSessionId()); - }).get(); - - ocs.reset(); - - stdx::async(stdx::launch::async, [&] { - ThreadClient tc(getGlobalServiceContext()); - auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - auto scopedSession = - SessionCatalog::get(sideOpCtx.get())->getOrCreateSession(sideOpCtx.get(), lsid); - - ASSERT(scopedSession.get()); - ASSERT_EQ(lsid, scopedSession->getSessionId()); - }).get(); -} - TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) { _opCtx->setLogicalSessionId(makeLogicalSessionIdForTest()); @@ -157,39 +119,42 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) { } TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) { - std::vector<LogicalSessionId> lsids; - const auto workerFn = [&lsids](WithLock, Session* session) { - lsids.push_back(session->getSessionId()); + std::vector<LogicalSessionId> lsidsFound; + const auto workerFn = [&lsidsFound](WithLock, Session* session) { + lsidsFound.push_back(session->getSessionId()); }; // Scan over zero Sessions. SessionKiller::Matcher matcherAllSessions( KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)}); catalog()->scanSessions(matcherAllSessions, workerFn); - ASSERT(lsids.empty()); + ASSERT(lsidsFound.empty()); + lsidsFound.clear(); // Create three sessions in the catalog. - auto lsid1 = makeLogicalSessionIdForTest(); - auto lsid2 = makeLogicalSessionIdForTest(); - auto lsid3 = makeLogicalSessionIdForTest(); - { - auto scopedSession1 = catalog()->getOrCreateSession(_opCtx, lsid1); - auto scopedSession2 = catalog()->getOrCreateSession(_opCtx, lsid2); - auto scopedSession3 = catalog()->getOrCreateSession(_opCtx, lsid3); + const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest(), + makeLogicalSessionIdForTest()}; + for (const auto& lsid : lsids) { + stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); + auto opCtx = makeOperationContext(); + const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid)); + }).get(); } // Scan over all Sessions. - lsids.clear(); catalog()->scanSessions(matcherAllSessions, workerFn); - ASSERT_EQ(lsids.size(), 3U); + ASSERT_EQ(3U, lsidsFound.size()); + lsidsFound.clear(); // Scan over all Sessions, visiting a particular Session. SessionKiller::Matcher matcherLSID2( - KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx, lsid2)}); - lsids.clear(); + KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx, lsids[2])}); catalog()->scanSessions(matcherLSID2, workerFn); - ASSERT_EQ(lsids.size(), 1U); - ASSERT_EQ(lsids.front(), lsid2); + ASSERT_EQ(1U, lsidsFound.size()); + ASSERT_EQ(lsids[2], lsidsFound.front()); + lsidsFound.clear(); } TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) { @@ -198,8 +163,7 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) { // Create the session so there is something to kill { auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession unusedOperationContextSession(opCtx.get()); + const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid)); } auto killToken = catalog()->killSession(lsid); @@ -215,12 +179,10 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) { // Schedule a separate "regular operation" thread, which will block on checking-out the session, // which we will use to confirm that session kill completion actually unblocks check-out - auto future = stdx::async(stdx::launch::async, [lsid] { - ThreadClient tc(getGlobalServiceContext()); + auto future = stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); - - OperationContextSession unusedOperationContextSession(sideOpCtx.get()); + const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid)); }); ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); @@ -234,8 +196,7 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) { // Make sure that session check-out after kill succeeds again { auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession unusedOperationContextSession(opCtx.get()); + const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid)); } // Make sure the "regular operation" eventually is able to proceed and use the just killed @@ -259,13 +220,13 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) { // Make sure that the checkOutForKill call will wait for the owning operation context to // check the session back in - auto future = stdx::async(stdx::launch::async, [lsid] { - ThreadClient tc(getGlobalServiceContext()); + auto future = stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); auto sideOpCtx = Client::getCurrent()->makeOperationContext(); sideOpCtx->setLogicalSessionId(lsid); sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired); - OperationContextSession unusedOperationContextSession(sideOpCtx.get()); + const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid)); }); ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired); @@ -284,12 +245,10 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) { // Schedule a separate "regular operation" thread, which will block on checking-out the session, // which we will use to confirm that session kill completion actually unblocks check-out - auto future = stdx::async(stdx::launch::async, [lsid] { - ThreadClient tc(getGlobalServiceContext()); + auto future = stdx::async(stdx::launch::async, [this, lsid] { + ThreadClient tc(getServiceContext()); auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); - - OperationContextSession unusedOperationContextSession(sideOpCtx.get()); + const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid)); }); ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration())); @@ -303,8 +262,7 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) { // Make sure that session check-out after kill succeeds again { auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession unusedOperationContextSession(opCtx.get()); + const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid)); } // Make sure the "regular operation" eventually is able to proceed and use the just killed @@ -318,8 +276,7 @@ TEST_F(SessionCatalogTest, MarkSessionAsKilledThrowsWhenCalledTwice) { // Create the session so there is something to kill { auto opCtx = makeOperationContext(); - opCtx->setLogicalSessionId(lsid); - OperationContextSession unusedOperationContextSession(opCtx.get()); + const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid)); } auto killToken = catalog()->killSession(lsid); @@ -394,14 +351,13 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) { for (const auto& lsid : lsids) { futures.emplace_back( - stdx::async(stdx::launch::async, [lsid, &firstUseOfTheSessionReachedBarrier] { - ThreadClient tc(getGlobalServiceContext()); + stdx::async(stdx::launch::async, [this, lsid, &firstUseOfTheSessionReachedBarrier] { + ThreadClient tc(getServiceContext()); { auto sideOpCtx = Client::getCurrent()->makeOperationContext(); - sideOpCtx->setLogicalSessionId(lsid); + const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid)); - OperationContextSession unusedOperationContextSession(sideOpCtx.get()); firstUseOfTheSessionReachedBarrier.countDownAndWait(); ASSERT_THROWS_CODE(sideOpCtx->sleepFor(Hours{6}), @@ -413,7 +369,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) { auto sideOpCtx = Client::getCurrent()->makeOperationContext(); sideOpCtx->setLogicalSessionId(lsid); - OperationContextSession unusedOperationContextSession(sideOpCtx.get()); + const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid)); } })); } diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index b873ecb43d7..99adc25c1ed 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -247,30 +247,6 @@ protected: func(newOpCtx.get()); } - void bumpTxnNumberFromDifferentOpCtx(const LogicalSessionId& sessionId, TxnNumber newTxnNum) { - auto func = [sessionId, newTxnNum](OperationContext* opCtx) { - auto session = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId); - auto txnParticipant = - TransactionParticipant::getFromNonCheckedOutSession(session.get()); - - // Check that there is a transaction in progress with a lower txnNumber. - ASSERT(txnParticipant->inMultiDocumentTransaction()); - ASSERT_LT(txnParticipant->getActiveTxnNumber(), newTxnNum); - - // Check that the transaction has some operations, so we can ensure they are cleared. - ASSERT_GT(txnParticipant->transactionOperationsForTest().size(), 0u); - - // Bump the active transaction number on the txnParticipant. This should clear all state - // from the previous transaction. - txnParticipant->beginOrContinue(newTxnNum, boost::none, boost::none); - ASSERT_EQ(newTxnNum, txnParticipant->getActiveTxnNumber()); - ASSERT_FALSE(txnParticipant->transactionIsAborted()); - ASSERT_EQ(txnParticipant->transactionOperationsForTest().size(), 0u); - }; - - runFunctionFromDifferentOpCtx(func); - } - std::unique_ptr<MongoDOperationContextSession> checkOutSession() { auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx()); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -643,28 +619,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfUnstashAndAbort) { ErrorCodes::NoSuchTransaction); } -TEST_F(TxnParticipantTest, ConcurrencyOfUnstashAndMigration) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - txnParticipant->unstashTransactionResources(opCtx(), "insert"); - - // The transaction machinery cannot store an empty locker. - { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); } - auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); - txnParticipant->addTransactionOperation(opCtx(), operation); - txnParticipant->stashTransactionResources(opCtx()); - - // A migration may bump the active transaction number without checking out the - // txnParticipant. - const auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum); - - // An unstash after a migration that bumps the active transaction number should uassert. - ASSERT_THROWS_CODE(txnParticipant->unstashTransactionResources(opCtx(), "insert"), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); -} - TEST_F(TxnParticipantTest, ConcurrencyOfStashAndAbort) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -677,25 +631,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfStashAndAbort) { txnParticipant->stashTransactionResources(opCtx()); } -TEST_F(TxnParticipantTest, ConcurrencyOfStashAndMigration) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant->unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); - txnParticipant->addTransactionOperation(opCtx(), operation); - - // A migration may bump the active transaction number without checking out the - // txnParticipant. - const auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum); - - // A stash after a migration that bumps the active transaction number should uassert. - ASSERT_THROWS_CODE(txnParticipant->stashTransactionResources(opCtx()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); -} - TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndAbort) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -711,26 +646,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndAbort) { ErrorCodes::NoSuchTransaction); } -TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndMigration) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant->unstashTransactionResources(opCtx(), "find"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); - txnParticipant->addTransactionOperation(opCtx(), operation); - - // A migration may bump the active transaction number without checking out the - // txnParticipant. - const auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum); - - // An addTransactionOperation() after a migration that bumps the active transaction number - // should uassert. - ASSERT_THROWS_CODE(txnParticipant->addTransactionOperation(opCtx(), operation), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); -} - TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -745,25 +660,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAb ErrorCodes::NoSuchTransaction); } -TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant->unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); - txnParticipant->addTransactionOperation(opCtx(), operation); - - // A migration may bump the active transaction number without checking out the txnParticipant. - const auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum); - - // An endTransactionAndRetrieveOperations() after a migration that bumps the active transaction - // number should uassert. - ASSERT_THROWS_CODE(txnParticipant->endTransactionAndRetrieveOperations(opCtx()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); -} - TEST_F(TxnParticipantTest, ConcurrencyOfCommitUnpreparedTransactionAndAbort) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -814,29 +710,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfActiveUnpreparedAbortAndArbitraryAbort) ASSERT(opCtx()->getWriteUnitOfWork() == nullptr); } -TEST_F(TxnParticipantTest, ConcurrencyOfActiveUnpreparedAbortAndMigration) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant->unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); - txnParticipant->addTransactionOperation(opCtx(), operation); - ASSERT(txnParticipant->inMultiDocumentTransaction()); - - // A migration may bump the active transaction number without checking out the txnParticipant. - const auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum); - - ASSERT_THROWS_CODE(txnParticipant->abortActiveTransaction(opCtx()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); - - // The abort fails so the OperationContext state is not cleaned up until the operation is - // complete. The session has already moved on to a new transaction so the transaction will not - // remain active beyond this operation. - ASSERT_FALSE(opCtx()->getWriteUnitOfWork() == nullptr); -} - TEST_F(TxnParticipantTest, ConcurrencyOfActivePreparedAbortAndArbitraryAbort) { auto sessionCheckout = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); @@ -1032,45 +905,6 @@ TEST_F(TxnParticipantTest, ThrowDuringUnpreparedCommitLetsTheAbortAtEntryPointTo ASSERT_TRUE(txnParticipant->transactionIsAborted()); } -TEST_F(TxnParticipantTest, ConcurrencyOfCommitUnpreparedTransactionAndMigration) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant->unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); - txnParticipant->addTransactionOperation(opCtx(), operation); - - // A migration may bump the active transaction number without checking out the txnParticipant. - const auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum); - - // A commitUnpreparedTransaction() after a migration that bumps the active transaction number - // should uassert. - ASSERT_THROWS_CODE(txnParticipant->commitUnpreparedTransaction(opCtx()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); -} - -TEST_F(TxnParticipantTest, ConcurrencyOfPrepareTransactionAndMigration) { - auto sessionCheckout = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant->unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); - txnParticipant->addTransactionOperation(opCtx(), operation); - - // A migration may bump the active transaction number without checking out the txnParticipant. - const auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum); - - // A prepareTransaction() after a migration that bumps the active transaction number should - // uassert. - ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx(), {}), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); - ASSERT_FALSE(_opObserver->transactionPrepared); -} - TEST_F(TxnParticipantTest, ContinuingATransactionWithNoResourcesAborts) { // Check out a session, start the transaction and check it in. checkOutSession(); @@ -1164,24 +998,29 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); - + OperationContextSession::checkIn(opCtx()); { // Try to start a new transaction while there is already a prepared transaction on the // session. This should fail with a PreparedTransactionInProgress error. - auto func = [&](OperationContext* newOpCtx) { - auto session = SessionCatalog::get(newOpCtx)->getOrCreateSession( - newOpCtx, *opCtx()->getLogicalSessionId()); + auto func = [ + lsid = *opCtx()->getLogicalSessionId(), + txnNumberToStart = *opCtx()->getTxnNumber() + 1 + ](OperationContext * newOpCtx) { + newOpCtx->setLogicalSessionId(lsid); + newOpCtx->setTxnNumber(txnNumberToStart); + + auto session = SessionCatalog::get(newOpCtx)->checkOutSession(newOpCtx); auto txnParticipant = TransactionParticipant::getFromNonCheckedOutSession(session.get()); - ASSERT_THROWS_CODE( - txnParticipant->beginOrContinue(*opCtx()->getTxnNumber() + 1, false, true), - AssertionException, - ErrorCodes::PreparedTransactionInProgress); + ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(txnNumberToStart, false, true), + AssertionException, + ErrorCodes::PreparedTransactionInProgress); }; runFunctionFromDifferentOpCtx(func); } + OperationContextSession::checkOut(opCtx()); ASSERT_FALSE(txnParticipant->transactionIsAborted()); ASSERT(_opObserver->transactionPrepared); @@ -1205,26 +1044,6 @@ TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) { ASSERT(_opObserver->transactionPrepared); } -TEST_F(TxnParticipantTest, MigrationThrowsOnPreparedTransaction) { - auto outerScopedSession = checkOutSession(); - auto txnParticipant = TransactionParticipant::get(opCtx()); - - txnParticipant->unstashTransactionResources(opCtx(), "insert"); - auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); - txnParticipant->addTransactionOperation(opCtx(), operation); - - txnParticipant->prepareTransaction(opCtx(), {}); - - // A migration may bump the active transaction number without checking out the session. - auto higherTxnNum = *opCtx()->getTxnNumber() + 1; - ASSERT_THROWS_CODE( - bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum), - AssertionException, - ErrorCodes::PreparedTransactionInProgress); - // The transaction is not affected. - ASSERT_TRUE(_opObserver->transactionPrepared); -} - TEST_F(TxnParticipantTest, ImplictAbortDoesNotAbortPreparedTransaction) { auto outerScopedSession = checkOutSession(); auto txnParticipant = TransactionParticipant::get(opCtx()); |