summaryrefslogtreecommitdiff
path: root/src/mongo/db
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-11-29 09:19:26 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-12-06 07:54:00 -0500
commite5967d7b430fb8eb8a2e56ab9bdaa946e7bbdfa8 (patch)
tree204072ee4aae802cb603f14067d4d7a774c0c535 /src/mongo/db
parent55ade18c488a08d160a3341799b9ea276f262d08 (diff)
downloadmongo-e5967d7b430fb8eb8a2e56ab9bdaa946e7bbdfa8.tar.gz
SERVER-37923 Make chunk migration check-out the session
With this change there are no callers which modify the session state without having checked it out first.
Diffstat (limited to 'src/mongo/db')
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp21
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp10
-rw-r--r--src/mongo/db/session_catalog.cpp36
-rw-r--r--src/mongo/db/session_catalog.h81
-rw-r--r--src/mongo/db/session_catalog_test.cpp118
-rw-r--r--src/mongo/db/transaction_participant_test.cpp207
7 files changed, 99 insertions, 376 deletions
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index afd07c8ac34..cb0b8c620c4 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -519,21 +519,17 @@ public:
void setUp() override {
OpObserverTest::setUp();
_opCtx = cc().makeOperationContext();
+
_opObserver.emplace();
MongoDSessionCatalog::onStepUp(opCtx());
-
- // Create a session.
- auto sessionCatalog = SessionCatalog::get(getServiceContext());
- auto sessionId = makeLogicalSessionIdForTest();
- _session = sessionCatalog->getOrCreateSession(opCtx(), sessionId);
-
_times.emplace(opCtx());
- opCtx()->setLogicalSessionId(session()->getSessionId());
- opCtx()->setTxnNumber(txnNum());
+ opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
+ opCtx()->setTxnNumber(txnNum());
_sessionCheckout = std::make_unique<MongoDOperationContextSession>(opCtx());
- auto txnParticipant = TransactionParticipant::get(opCtx());
+
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, true);
}
@@ -592,7 +588,7 @@ protected:
}
Session* session() {
- return _session->get();
+ return OperationContextSession::get(opCtx());
}
OpObserverImpl& opObserver() {
@@ -613,10 +609,11 @@ private:
typedef OpObserver::ReservedTimes ReservedTimes;
};
- boost::optional<OpObserverImpl> _opObserver;
- boost::optional<ScopedSession> _session;
ServiceContext::UniqueOperationContext _opCtx;
+
+ boost::optional<OpObserverImpl> _opObserver;
boost::optional<ExposeOpObserverTimes::ReservedTimes> _times;
+
std::unique_ptr<MongoDOperationContextSession> _sessionCheckout;
TxnNumber _txnNum = 0;
};
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index b7d04fcbfcb..4d2a9c32e97 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -247,7 +247,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
const auto stmtId = *oplogEntry.getStatementId();
- auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, result.sessionId);
+ auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, result.sessionId);
auto const txnParticipant =
TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get());
txnParticipant->refreshFromStorageIfNeeded(opCtx);
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 1f52e217372..ef641fce877 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -164,10 +164,10 @@ public:
return _migrationId.value();
}
- ScopedSession getSessionWithTxn(OperationContext* opCtx,
- const LogicalSessionId& sessionId,
- const TxnNumber& txnNum) {
- auto scopedSession = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId);
+ ScopedCheckedOutSession getSessionWithTxn(OperationContext* opCtx,
+ const LogicalSessionId& sessionId,
+ const TxnNumber& txnNum) {
+ auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, sessionId);
const auto txnParticipant =
TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get());
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -1590,7 +1590,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
// in TransactionTooOld. This should not preclude the entries for session 2 from getting
// applied.
auto scopedSession =
- SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, *sessionInfo1.getSessionId());
+ SessionCatalog::get(opCtx)->checkOutSession(opCtx, *sessionInfo1.getSessionId());
const auto txnParticipant =
TransactionParticipant::getFromNonCheckedOutSession(scopedSession.get());
txnParticipant->refreshFromStorageIfNeeded(opCtx);
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index 94ee9fa940b..a708c290acf 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -73,15 +73,18 @@ SessionCatalog* SessionCatalog::get(ServiceContext* service) {
}
ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx) {
+ invariant(opCtx->getLogicalSessionId());
+ return checkOutSession(opCtx, *opCtx->getLogicalSessionId());
+}
+
+ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx,
+ const LogicalSessionId& lsid) {
// This method is not supposed to be called with an already checked-out session due to risk of
// deadlock
invariant(!operationSessionDecoration(opCtx));
invariant(!opCtx->lockState()->inAWriteUnitOfWork());
invariant(!opCtx->lockState()->isLocked());
- invariant(opCtx->getLogicalSessionId());
- const auto lsid = *opCtx->getLogicalSessionId();
-
stdx::unique_lock<stdx::mutex> ul(_mutex);
auto sri = _getOrCreateSessionRuntimeInfo(ul, opCtx, lsid);
@@ -94,7 +97,7 @@ ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext* opCtx)
sri->session._markCheckedOut(ul, opCtx);
return ScopedCheckedOutSession(
- opCtx, ScopedSession(std::move(sri)), boost::none /* Not checked out for kill */);
+ *this, std::move(sri), boost::none /* Not checked out for kill */);
}
ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext* opCtx,
@@ -118,21 +121,7 @@ ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext*
sri->session._markCheckedOut(ul, opCtx);
return ScopedCheckedOutSession(
- opCtx, ScopedSession(std::move(sri)), std::move(killToken) /* Checked out for kill */);
-}
-
-ScopedSession SessionCatalog::getOrCreateSession(OperationContext* opCtx,
- const LogicalSessionId& lsid) {
- invariant(!opCtx->lockState()->isLocked());
- invariant(!opCtx->getLogicalSessionId());
- invariant(!opCtx->getTxnNumber());
-
- auto ss = [&] {
- stdx::unique_lock<stdx::mutex> ul(_mutex);
- return ScopedSession(_getOrCreateSessionRuntimeInfo(ul, opCtx, lsid));
- }();
-
- return ss;
+ *this, std::move(sri), std::move(killToken) /* Checked out for kill */);
}
void SessionCatalog::scanSessions(const SessionKiller::Matcher& matcher,
@@ -167,14 +156,13 @@ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> SessionCatalog::_getOrCreate
return it->second;
}
-void SessionCatalog::_releaseSession(const LogicalSessionId& lsid,
+void SessionCatalog::_releaseSession(std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri,
boost::optional<Session::KillToken> killToken) {
stdx::lock_guard<stdx::mutex> lg(_mutex);
- auto it = _sessions.find(lsid);
- invariant(it != _sessions.end());
-
- auto& sri = it->second;
+ // Make sure we have exactly the same session on the map and that it is still associated with an
+ // operation context (meaning checked-out)
+ invariant(_sessions[sri->session.getSessionId()] == sri);
invariant(sri->session.currentOperation());
sri->session._markCheckedIn(lg);
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 62eeca00acc..1ead5cad736 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -75,9 +75,12 @@ public:
void reset_forTest();
/**
- * Potentially blocking call, which uses the session information stored in the specified
- * operation context and either creates a brand new session object (if one doesn't exist) or
- * "checks-out" the existing one (if it is not currently in use or marked for kill).
+ * Potentially blocking call, which either creates a brand new session object (if one doesn't
+ * exist) or "checks-out" the existing one (if it is not currently in use or marked for kill).
+ *
+ * The 'opCtx'-only variant uses the session information stored on the operation context and the
+ * variant, which has the 'lsid' parameter checks-out that session id. Neither of these methods
+ * can be called with an already checked-out session.
*
* Checking out a session puts it in the 'checked out' state and all subsequent calls to
* checkout will block until it is checked back in. This happens when the returned object goes
@@ -86,6 +89,7 @@ public:
* Throws exception on errors.
*/
ScopedCheckedOutSession checkOutSession(OperationContext* opCtx);
+ ScopedCheckedOutSession checkOutSession(OperationContext* opCtx, const LogicalSessionId& lsid);
/**
* See the description of 'Session::kill' for more information on the session kill usage
@@ -95,17 +99,6 @@ public:
Session::KillToken killToken);
/**
- * Returns a reference to the specified cached session regardless of whether it is checked-out
- * or not. The returned session is not returned checked-out and is allowed to be checked-out
- * concurrently.
- *
- * The intended usage for this method is to allow migrations to run in parallel with writes for
- * the same session without blocking it. Because of this, it may not be used from operations
- * which run on a session.
- */
- ScopedSession getOrCreateSession(OperationContext* opCtx, const LogicalSessionId& lsid);
-
- /**
* Iterates through the SessionCatalog under the SessionCatalog mutex and applies 'workerFn' to
* each Session which matches the specified 'matcher'.
*
@@ -150,7 +143,7 @@ private:
/**
* Makes a session, previously checked out through 'checkoutSession', available again.
*/
- void _releaseSession(const LogicalSessionId& lsid,
+ void _releaseSession(std::shared_ptr<SessionRuntimeInfo> sri,
boost::optional<Session::KillToken> killToken);
stdx::mutex _mutex;
@@ -160,43 +153,14 @@ private:
};
/**
- * Scoped object representing a reference to a session.
- */
-class ScopedSession {
-public:
- explicit ScopedSession(std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri)
- : _sri(std::move(sri)) {
- invariant(_sri);
- }
-
- Session* get() const {
- return &_sri->session;
- }
-
- Session* operator->() const {
- return get();
- }
-
- Session& operator*() const {
- return *get();
- }
-
- operator bool() const {
- return !!_sri;
- }
-
-private:
- std::shared_ptr<SessionCatalog::SessionRuntimeInfo> _sri;
-};
-
-/**
* Scoped object representing a checked-out session. See comments for the 'checkoutSession' method
* for more information on its behaviour.
*/
class ScopedCheckedOutSession {
MONGO_DISALLOW_COPYING(ScopedCheckedOutSession);
- friend ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext*);
+ friend ScopedCheckedOutSession SessionCatalog::checkOutSession(OperationContext*,
+ const LogicalSessionId&);
friend ScopedCheckedOutSession SessionCatalog::checkOutSessionForKill(OperationContext*,
Session::KillToken);
@@ -204,14 +168,13 @@ public:
ScopedCheckedOutSession(ScopedCheckedOutSession&&) = default;
~ScopedCheckedOutSession() {
- if (_scopedSession) {
- SessionCatalog::get(_opCtx)->_releaseSession(_scopedSession->getSessionId(),
- std::move(_killToken));
+ if (_sri) {
+ _catalog._releaseSession(std::move(_sri), std::move(_killToken));
}
}
Session* get() const {
- return _scopedSession.get();
+ return &_sri->session;
}
Session* operator->() const {
@@ -223,22 +186,22 @@ public:
}
operator bool() const {
- return _scopedSession;
+ return bool(_sri);
}
private:
- ScopedCheckedOutSession(OperationContext* opCtx,
- ScopedSession scopedSession,
+ ScopedCheckedOutSession(SessionCatalog& catalog,
+ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> sri,
boost::optional<Session::KillToken> killToken)
- : _opCtx(opCtx),
- _killToken(std::move(killToken)),
- _scopedSession(std::move(scopedSession)) {}
+ : _catalog(catalog), _sri(std::move(sri)), _killToken(std::move(killToken)) {}
- OperationContext* const _opCtx;
+ // The owning session catalog into which the session should be checked back
+ SessionCatalog& _catalog;
- boost::optional<Session::KillToken> _killToken;
+ std::shared_ptr<SessionCatalog::SessionRuntimeInfo> _sri;
- ScopedSession _scopedSession;
+ // Only set if the session was obtained though checkOutSessionForKill
+ boost::optional<Session::KillToken> _killToken;
};
/**
diff --git a/src/mongo/db/session_catalog_test.cpp b/src/mongo/db/session_catalog_test.cpp
index 66d4ae6d2be..48431675529 100644
--- a/src/mongo/db/session_catalog_test.cpp
+++ b/src/mongo/db/session_catalog_test.cpp
@@ -92,44 +92,6 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, OperationContextCheckedOutSession) {
ASSERT_EQ(*_opCtx->getLogicalSessionId(), session->getSessionId());
}
-TEST_F(SessionCatalogTestWithDefaultOpCtx, GetOrCreateNonExistentSession) {
- const auto lsid = makeLogicalSessionIdForTest();
- auto scopedSession = catalog()->getOrCreateSession(_opCtx, lsid);
-
- ASSERT(scopedSession.get());
- ASSERT_EQ(lsid, scopedSession->getSessionId());
-}
-
-TEST_F(SessionCatalogTestWithDefaultOpCtx, GetOrCreateSessionAfterCheckOutSession) {
- const auto lsid = makeLogicalSessionIdForTest();
- _opCtx->setLogicalSessionId(lsid);
-
- boost::optional<OperationContextSession> ocs;
- ocs.emplace(_opCtx);
-
- stdx::async(stdx::launch::async, [&] {
- ThreadClient tc(getGlobalServiceContext());
- auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- auto scopedSession =
- SessionCatalog::get(sideOpCtx.get())->getOrCreateSession(sideOpCtx.get(), lsid);
-
- ASSERT(scopedSession.get());
- ASSERT_EQ(lsid, scopedSession->getSessionId());
- }).get();
-
- ocs.reset();
-
- stdx::async(stdx::launch::async, [&] {
- ThreadClient tc(getGlobalServiceContext());
- auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- auto scopedSession =
- SessionCatalog::get(sideOpCtx.get())->getOrCreateSession(sideOpCtx.get(), lsid);
-
- ASSERT(scopedSession.get());
- ASSERT_EQ(lsid, scopedSession->getSessionId());
- }).get();
-}
-
TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) {
_opCtx->setLogicalSessionId(makeLogicalSessionIdForTest());
@@ -157,39 +119,42 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, NestedOperationContextSession) {
}
TEST_F(SessionCatalogTestWithDefaultOpCtx, ScanSessions) {
- std::vector<LogicalSessionId> lsids;
- const auto workerFn = [&lsids](WithLock, Session* session) {
- lsids.push_back(session->getSessionId());
+ std::vector<LogicalSessionId> lsidsFound;
+ const auto workerFn = [&lsidsFound](WithLock, Session* session) {
+ lsidsFound.push_back(session->getSessionId());
};
// Scan over zero Sessions.
SessionKiller::Matcher matcherAllSessions(
KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx)});
catalog()->scanSessions(matcherAllSessions, workerFn);
- ASSERT(lsids.empty());
+ ASSERT(lsidsFound.empty());
+ lsidsFound.clear();
// Create three sessions in the catalog.
- auto lsid1 = makeLogicalSessionIdForTest();
- auto lsid2 = makeLogicalSessionIdForTest();
- auto lsid3 = makeLogicalSessionIdForTest();
- {
- auto scopedSession1 = catalog()->getOrCreateSession(_opCtx, lsid1);
- auto scopedSession2 = catalog()->getOrCreateSession(_opCtx, lsid2);
- auto scopedSession3 = catalog()->getOrCreateSession(_opCtx, lsid3);
+ const std::vector<LogicalSessionId> lsids{makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest(),
+ makeLogicalSessionIdForTest()};
+ for (const auto& lsid : lsids) {
+ stdx::async(stdx::launch::async, [this, lsid] {
+ ThreadClient tc(getServiceContext());
+ auto opCtx = makeOperationContext();
+ const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
+ }).get();
}
// Scan over all Sessions.
- lsids.clear();
catalog()->scanSessions(matcherAllSessions, workerFn);
- ASSERT_EQ(lsids.size(), 3U);
+ ASSERT_EQ(3U, lsidsFound.size());
+ lsidsFound.clear();
// Scan over all Sessions, visiting a particular Session.
SessionKiller::Matcher matcherLSID2(
- KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx, lsid2)});
- lsids.clear();
+ KillAllSessionsByPatternSet{makeKillAllSessionsByPattern(_opCtx, lsids[2])});
catalog()->scanSessions(matcherLSID2, workerFn);
- ASSERT_EQ(lsids.size(), 1U);
- ASSERT_EQ(lsids.front(), lsid2);
+ ASSERT_EQ(1U, lsidsFound.size());
+ ASSERT_EQ(lsids[2], lsidsFound.front());
+ lsidsFound.clear();
}
TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
@@ -198,8 +163,7 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
// Create the session so there is something to kill
{
auto opCtx = makeOperationContext();
- opCtx->setLogicalSessionId(lsid);
- OperationContextSession unusedOperationContextSession(opCtx.get());
+ const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
}
auto killToken = catalog()->killSession(lsid);
@@ -215,12 +179,10 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
// Schedule a separate "regular operation" thread, which will block on checking-out the session,
// which we will use to confirm that session kill completion actually unblocks check-out
- auto future = stdx::async(stdx::launch::async, [lsid] {
- ThreadClient tc(getGlobalServiceContext());
+ auto future = stdx::async(stdx::launch::async, [this, lsid] {
+ ThreadClient tc(getServiceContext());
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- sideOpCtx->setLogicalSessionId(lsid);
-
- OperationContextSession unusedOperationContextSession(sideOpCtx.get());
+ const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
});
ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration()));
@@ -234,8 +196,7 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsNotCheckedOut) {
// Make sure that session check-out after kill succeeds again
{
auto opCtx = makeOperationContext();
- opCtx->setLogicalSessionId(lsid);
- OperationContextSession unusedOperationContextSession(opCtx.get());
+ const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
}
// Make sure the "regular operation" eventually is able to proceed and use the just killed
@@ -259,13 +220,13 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
// Make sure that the checkOutForKill call will wait for the owning operation context to
// check the session back in
- auto future = stdx::async(stdx::launch::async, [lsid] {
- ThreadClient tc(getGlobalServiceContext());
+ auto future = stdx::async(stdx::launch::async, [this, lsid] {
+ ThreadClient tc(getServiceContext());
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
sideOpCtx->setLogicalSessionId(lsid);
sideOpCtx->setDeadlineAfterNowBy(Milliseconds(10), ErrorCodes::MaxTimeMSExpired);
- OperationContextSession unusedOperationContextSession(sideOpCtx.get());
+ const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
});
ASSERT_THROWS_CODE(future.get(), AssertionException, ErrorCodes::MaxTimeMSExpired);
@@ -284,12 +245,10 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
// Schedule a separate "regular operation" thread, which will block on checking-out the session,
// which we will use to confirm that session kill completion actually unblocks check-out
- auto future = stdx::async(stdx::launch::async, [lsid] {
- ThreadClient tc(getGlobalServiceContext());
+ auto future = stdx::async(stdx::launch::async, [this, lsid] {
+ ThreadClient tc(getServiceContext());
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- sideOpCtx->setLogicalSessionId(lsid);
-
- OperationContextSession unusedOperationContextSession(sideOpCtx.get());
+ const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
});
ASSERT(stdx::future_status::ready != future.wait_for(Milliseconds(10).toSystemDuration()));
@@ -303,8 +262,7 @@ TEST_F(SessionCatalogTest, KillSessionWhenSessionIsCheckedOut) {
// Make sure that session check-out after kill succeeds again
{
auto opCtx = makeOperationContext();
- opCtx->setLogicalSessionId(lsid);
- OperationContextSession unusedOperationContextSession(opCtx.get());
+ const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
}
// Make sure the "regular operation" eventually is able to proceed and use the just killed
@@ -318,8 +276,7 @@ TEST_F(SessionCatalogTest, MarkSessionAsKilledThrowsWhenCalledTwice) {
// Create the session so there is something to kill
{
auto opCtx = makeOperationContext();
- opCtx->setLogicalSessionId(lsid);
- OperationContextSession unusedOperationContextSession(opCtx.get());
+ const auto unusedSession(catalog()->checkOutSession(opCtx.get(), lsid));
}
auto killToken = catalog()->killSession(lsid);
@@ -394,14 +351,13 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
for (const auto& lsid : lsids) {
futures.emplace_back(
- stdx::async(stdx::launch::async, [lsid, &firstUseOfTheSessionReachedBarrier] {
- ThreadClient tc(getGlobalServiceContext());
+ stdx::async(stdx::launch::async, [this, lsid, &firstUseOfTheSessionReachedBarrier] {
+ ThreadClient tc(getServiceContext());
{
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
- sideOpCtx->setLogicalSessionId(lsid);
+ const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
- OperationContextSession unusedOperationContextSession(sideOpCtx.get());
firstUseOfTheSessionReachedBarrier.countDownAndWait();
ASSERT_THROWS_CODE(sideOpCtx->sleepFor(Hours{6}),
@@ -413,7 +369,7 @@ TEST_F(SessionCatalogTestWithDefaultOpCtx, KillSessionsThroughScanSessions) {
auto sideOpCtx = Client::getCurrent()->makeOperationContext();
sideOpCtx->setLogicalSessionId(lsid);
- OperationContextSession unusedOperationContextSession(sideOpCtx.get());
+ const auto unusedSession(catalog()->checkOutSession(sideOpCtx.get(), lsid));
}
}));
}
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index b873ecb43d7..99adc25c1ed 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -247,30 +247,6 @@ protected:
func(newOpCtx.get());
}
- void bumpTxnNumberFromDifferentOpCtx(const LogicalSessionId& sessionId, TxnNumber newTxnNum) {
- auto func = [sessionId, newTxnNum](OperationContext* opCtx) {
- auto session = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId);
- auto txnParticipant =
- TransactionParticipant::getFromNonCheckedOutSession(session.get());
-
- // Check that there is a transaction in progress with a lower txnNumber.
- ASSERT(txnParticipant->inMultiDocumentTransaction());
- ASSERT_LT(txnParticipant->getActiveTxnNumber(), newTxnNum);
-
- // Check that the transaction has some operations, so we can ensure they are cleared.
- ASSERT_GT(txnParticipant->transactionOperationsForTest().size(), 0u);
-
- // Bump the active transaction number on the txnParticipant. This should clear all state
- // from the previous transaction.
- txnParticipant->beginOrContinue(newTxnNum, boost::none, boost::none);
- ASSERT_EQ(newTxnNum, txnParticipant->getActiveTxnNumber());
- ASSERT_FALSE(txnParticipant->transactionIsAborted());
- ASSERT_EQ(txnParticipant->transactionOperationsForTest().size(), 0u);
- };
-
- runFunctionFromDifferentOpCtx(func);
- }
-
std::unique_ptr<MongoDOperationContextSession> checkOutSession() {
auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx());
auto txnParticipant = TransactionParticipant::get(opCtx());
@@ -643,28 +619,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfUnstashAndAbort) {
ErrorCodes::NoSuchTransaction);
}
-TEST_F(TxnParticipantTest, ConcurrencyOfUnstashAndMigration) {
- auto sessionCheckout = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
- txnParticipant->unstashTransactionResources(opCtx(), "insert");
-
- // The transaction machinery cannot store an empty locker.
- { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
- auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
- txnParticipant->addTransactionOperation(opCtx(), operation);
- txnParticipant->stashTransactionResources(opCtx());
-
- // A migration may bump the active transaction number without checking out the
- // txnParticipant.
- const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
- bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
-
- // An unstash after a migration that bumps the active transaction number should uassert.
- ASSERT_THROWS_CODE(txnParticipant->unstashTransactionResources(opCtx(), "insert"),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
-}
-
TEST_F(TxnParticipantTest, ConcurrencyOfStashAndAbort) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
@@ -677,25 +631,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfStashAndAbort) {
txnParticipant->stashTransactionResources(opCtx());
}
-TEST_F(TxnParticipantTest, ConcurrencyOfStashAndMigration) {
- auto sessionCheckout = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
-
- txnParticipant->unstashTransactionResources(opCtx(), "insert");
- auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
- txnParticipant->addTransactionOperation(opCtx(), operation);
-
- // A migration may bump the active transaction number without checking out the
- // txnParticipant.
- const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
- bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
-
- // A stash after a migration that bumps the active transaction number should uassert.
- ASSERT_THROWS_CODE(txnParticipant->stashTransactionResources(opCtx()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
-}
-
TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndAbort) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
@@ -711,26 +646,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndAbort) {
ErrorCodes::NoSuchTransaction);
}
-TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndMigration) {
- auto sessionCheckout = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
-
- txnParticipant->unstashTransactionResources(opCtx(), "find");
- auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
- txnParticipant->addTransactionOperation(opCtx(), operation);
-
- // A migration may bump the active transaction number without checking out the
- // txnParticipant.
- const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
- bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
-
- // An addTransactionOperation() after a migration that bumps the active transaction number
- // should uassert.
- ASSERT_THROWS_CODE(txnParticipant->addTransactionOperation(opCtx(), operation),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
-}
-
TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
@@ -745,25 +660,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAb
ErrorCodes::NoSuchTransaction);
}
-TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration) {
- auto sessionCheckout = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
-
- txnParticipant->unstashTransactionResources(opCtx(), "insert");
- auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
- txnParticipant->addTransactionOperation(opCtx(), operation);
-
- // A migration may bump the active transaction number without checking out the txnParticipant.
- const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
- bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
-
- // An endTransactionAndRetrieveOperations() after a migration that bumps the active transaction
- // number should uassert.
- ASSERT_THROWS_CODE(txnParticipant->endTransactionAndRetrieveOperations(opCtx()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
-}
-
TEST_F(TxnParticipantTest, ConcurrencyOfCommitUnpreparedTransactionAndAbort) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
@@ -814,29 +710,6 @@ TEST_F(TxnParticipantTest, ConcurrencyOfActiveUnpreparedAbortAndArbitraryAbort)
ASSERT(opCtx()->getWriteUnitOfWork() == nullptr);
}
-TEST_F(TxnParticipantTest, ConcurrencyOfActiveUnpreparedAbortAndMigration) {
- auto sessionCheckout = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
-
- txnParticipant->unstashTransactionResources(opCtx(), "insert");
- auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
- txnParticipant->addTransactionOperation(opCtx(), operation);
- ASSERT(txnParticipant->inMultiDocumentTransaction());
-
- // A migration may bump the active transaction number without checking out the txnParticipant.
- const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
- bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
-
- ASSERT_THROWS_CODE(txnParticipant->abortActiveTransaction(opCtx()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
-
- // The abort fails so the OperationContext state is not cleaned up until the operation is
- // complete. The session has already moved on to a new transaction so the transaction will not
- // remain active beyond this operation.
- ASSERT_FALSE(opCtx()->getWriteUnitOfWork() == nullptr);
-}
-
TEST_F(TxnParticipantTest, ConcurrencyOfActivePreparedAbortAndArbitraryAbort) {
auto sessionCheckout = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());
@@ -1032,45 +905,6 @@ TEST_F(TxnParticipantTest, ThrowDuringUnpreparedCommitLetsTheAbortAtEntryPointTo
ASSERT_TRUE(txnParticipant->transactionIsAborted());
}
-TEST_F(TxnParticipantTest, ConcurrencyOfCommitUnpreparedTransactionAndMigration) {
- auto sessionCheckout = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
-
- txnParticipant->unstashTransactionResources(opCtx(), "insert");
- auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
- txnParticipant->addTransactionOperation(opCtx(), operation);
-
- // A migration may bump the active transaction number without checking out the txnParticipant.
- const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
- bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
-
- // A commitUnpreparedTransaction() after a migration that bumps the active transaction number
- // should uassert.
- ASSERT_THROWS_CODE(txnParticipant->commitUnpreparedTransaction(opCtx()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
-}
-
-TEST_F(TxnParticipantTest, ConcurrencyOfPrepareTransactionAndMigration) {
- auto sessionCheckout = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
-
- txnParticipant->unstashTransactionResources(opCtx(), "insert");
- auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
- txnParticipant->addTransactionOperation(opCtx(), operation);
-
- // A migration may bump the active transaction number without checking out the txnParticipant.
- const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
- bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
-
- // A prepareTransaction() after a migration that bumps the active transaction number should
- // uassert.
- ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx(), {}),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
- ASSERT_FALSE(_opObserver->transactionPrepared);
-}
-
TEST_F(TxnParticipantTest, ContinuingATransactionWithNoResourcesAborts) {
// Check out a session, start the transaction and check it in.
checkOutSession();
@@ -1164,24 +998,29 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr
ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
-
+ OperationContextSession::checkIn(opCtx());
{
// Try to start a new transaction while there is already a prepared transaction on the
// session. This should fail with a PreparedTransactionInProgress error.
- auto func = [&](OperationContext* newOpCtx) {
- auto session = SessionCatalog::get(newOpCtx)->getOrCreateSession(
- newOpCtx, *opCtx()->getLogicalSessionId());
+ auto func = [
+ lsid = *opCtx()->getLogicalSessionId(),
+ txnNumberToStart = *opCtx()->getTxnNumber() + 1
+ ](OperationContext * newOpCtx) {
+ newOpCtx->setLogicalSessionId(lsid);
+ newOpCtx->setTxnNumber(txnNumberToStart);
+
+ auto session = SessionCatalog::get(newOpCtx)->checkOutSession(newOpCtx);
auto txnParticipant =
TransactionParticipant::getFromNonCheckedOutSession(session.get());
- ASSERT_THROWS_CODE(
- txnParticipant->beginOrContinue(*opCtx()->getTxnNumber() + 1, false, true),
- AssertionException,
- ErrorCodes::PreparedTransactionInProgress);
+ ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(txnNumberToStart, false, true),
+ AssertionException,
+ ErrorCodes::PreparedTransactionInProgress);
};
runFunctionFromDifferentOpCtx(func);
}
+ OperationContextSession::checkOut(opCtx());
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT(_opObserver->transactionPrepared);
@@ -1205,26 +1044,6 @@ TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) {
ASSERT(_opObserver->transactionPrepared);
}
-TEST_F(TxnParticipantTest, MigrationThrowsOnPreparedTransaction) {
- auto outerScopedSession = checkOutSession();
- auto txnParticipant = TransactionParticipant::get(opCtx());
-
- txnParticipant->unstashTransactionResources(opCtx(), "insert");
- auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
- txnParticipant->addTransactionOperation(opCtx(), operation);
-
- txnParticipant->prepareTransaction(opCtx(), {});
-
- // A migration may bump the active transaction number without checking out the session.
- auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
- ASSERT_THROWS_CODE(
- bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum),
- AssertionException,
- ErrorCodes::PreparedTransactionInProgress);
- // The transaction is not affected.
- ASSERT_TRUE(_opObserver->transactionPrepared);
-}
-
TEST_F(TxnParticipantTest, ImplictAbortDoesNotAbortPreparedTransaction) {
auto outerScopedSession = checkOutSession();
auto txnParticipant = TransactionParticipant::get(opCtx());