summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-05-31 18:51:03 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-03 01:38:58 +0000
commit116383210582edb4ba1f447380c25d0dc2469070 (patch)
tree0f211edecc25684693fffd7cf7461d8b90577f9c
parent5efce3b40fa798be3111cc3fcdb8a1167c3c94a5 (diff)
downloadmongo-116383210582edb4ba1f447380c25d0dc2469070.tar.gz
SERVER-66852 Eagerly erase retryable child sessions from SessionCatalog
(cherry picked from commit 8e96ecbf56a0763f4300aa38746ac86a8335d264)
-rw-r--r--src/mongo/db/logical_session_id_helpers.cpp5
-rw-r--r--src/mongo/db/logical_session_id_helpers.h6
-rw-r--r--src/mongo/db/session_catalog.cpp68
-rw-r--r--src/mongo/db/session_catalog.h30
-rw-r--r--src/mongo/db/transaction_participant.cpp8
-rw-r--r--src/mongo/db/transaction_participant_test.cpp444
-rw-r--r--src/mongo/s/transaction_router.cpp55
-rw-r--r--src/mongo/s/transaction_router_test.cpp87
8 files changed, 667 insertions, 36 deletions
diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp
index 0f312f0d30f..14576c6bd96 100644
--- a/src/mongo/db/logical_session_id_helpers.cpp
+++ b/src/mongo/db/logical_session_id_helpers.cpp
@@ -75,6 +75,11 @@ SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db) {
return SHA256Block::computeHash({ConstDataRange(fn.c_str(), fn.size())});
}
+bool isParentSessionId(const LogicalSessionId& sessionId) {
+ // All child sessions must have a txnUUID.
+ return !sessionId.getTxnUUID();
+}
+
boost::optional<LogicalSessionId> getParentSessionId(const LogicalSessionId& sessionId) {
if (sessionId.getTxnUUID()) {
return LogicalSessionId{sessionId.getId(), sessionId.getUid()};
diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h
index 631ca0d979a..3d692c701e5 100644
--- a/src/mongo/db/logical_session_id_helpers.h
+++ b/src/mongo/db/logical_session_id_helpers.h
@@ -50,6 +50,12 @@ SHA256Block getLogicalSessionUserDigestForLoggedInUser(const OperationContext* o
SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db);
/**
+ * Returns if the given session is a parent session, ie only has fields that could have come from an
+ * external client.
+ */
+bool isParentSessionId(const LogicalSessionId& sessionId);
+
+/**
* Returns the parent session id for the given session id if there is one.
*/
boost::optional<LogicalSessionId> getParentSessionId(const LogicalSessionId& sessionId);
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index 8e66b4b1732..39fe678aa28 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -85,7 +85,7 @@ SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionInner(
stdx::unique_lock<Latch> ul(_mutex);
auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid);
- auto session = sri->getSession(lsid);
+ auto session = sri->getSession(ul, lsid);
invariant(session);
if (killToken) {
@@ -138,7 +138,7 @@ void SessionCatalog::scanSession(const LogicalSessionId& lsid,
stdx::lock_guard<Latch> lg(_mutex);
if (auto sri = _getSessionRuntimeInfo(lg, lsid)) {
- auto session = sri->getSession(lsid);
+ auto session = sri->getSession(lg, lsid);
invariant(session);
ObservableSession osession(lg, sri, session);
@@ -249,7 +249,7 @@ SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& ls
auto sri = _getSessionRuntimeInfo(lg, lsid);
uassert(ErrorCodes::NoSuchSession, "Session not found", sri);
- auto session = sri->getSession(lsid);
+ auto session = sri->getSession(lg, lsid);
uassert(ErrorCodes::NoSuchSession, "Session not found", session);
return ObservableSession(lg, sri, session).kill();
}
@@ -265,7 +265,7 @@ void SessionCatalog::createSessionIfDoesNotExist(const LogicalSessionId& lsid) {
}
SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo(
- WithLock, const LogicalSessionId& lsid) {
+ WithLock wl, const LogicalSessionId& lsid) {
auto parentLsid = castToParentSessionId(lsid);
auto sriIt = _sessions.find(parentLsid);
@@ -274,7 +274,7 @@ SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo(
}
auto sri = sriIt->second.get();
- auto session = sri->getSession(lsid);
+ auto session = sri->getSession(wl, lsid);
if (session) {
return sri;
@@ -312,7 +312,8 @@ SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeIn
void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
Session* session,
- boost::optional<KillToken> killToken) {
+ boost::optional<KillToken> killToken,
+ boost::optional<TxnNumber> clientTxnNumberStarted) {
stdx::lock_guard<Latch> lg(_mutex);
// Make sure we have exactly the same session on the map and that it is still associated with an
@@ -331,9 +332,30 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
invariant(sri->killsRequested > 0);
--sri->killsRequested;
}
+
+ if (clientTxnNumberStarted.has_value()) {
+ // Since the given txnNumber successfully started, we know any child sessions with older
+ // txnNumbers can be discarded. This needed to wait until a transaction started because that
+ // can fail, e.g. if the active transaction is prepared.
+ auto numReaped = stdx::erase_if(sri->childSessions, [&](auto&& it) {
+ ObservableSession osession(lg, sri, &it.second);
+ if (it.first.getTxnNumber() && *it.first.getTxnNumber() < *clientTxnNumberStarted) {
+ osession.markForReap(ObservableSession::ReapMode::kExclusive);
+ }
+ return osession._shouldBeReaped();
+ });
+
+ LOGV2_DEBUG(6685200,
+ 4,
+ "Erased child sessions",
+ "releasedLsid"_attr = session->getSessionId(),
+ "clientTxnNumber"_attr = *clientTxnNumberStarted,
+ "childSessionsRemaining"_attr = sri->childSessions.size(),
+ "numReaped"_attr = numReaped);
+ }
}
-Session* SessionCatalog::SessionRuntimeInfo::getSession(const LogicalSessionId& lsid) {
+Session* SessionCatalog::SessionRuntimeInfo::getSession(WithLock, const LogicalSessionId& lsid) {
if (lsid == parentSession._sessionId) {
return &parentSession;
}
@@ -478,4 +500,36 @@ void OperationContextSession::checkOut(OperationContext* opCtx) {
checkedOutSession.emplace(std::move(scopedCheckedOutSession));
}
+void OperationContextSession::observeNewTxnNumberStarted(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) {
+ auto& checkedOutSession = operationSessionDecoration(opCtx);
+ invariant(checkedOutSession);
+
+ LOGV2_DEBUG(6685201,
+ 4,
+ "Observing new retryable write number started on session",
+ "lsid"_attr = lsid,
+ "txnNumber"_attr = txnNumber);
+
+ const auto& checkedOutLsid = (*checkedOutSession)->getSessionId();
+ if (isParentSessionId(lsid)) {
+ // Observing a new transaction/retryable write on a parent session.
+
+ // The operation must have checked out the parent session itself or a child session of the
+ // parent. This is safe because both share the same SessionRuntimeInfo.
+ dassert(lsid == checkedOutLsid || lsid == *getParentSessionId(checkedOutLsid));
+
+ checkedOutSession->observeNewClientTxnNumberStarted(txnNumber);
+ } else if (isInternalSessionForRetryableWrite(lsid)) {
+ // Observing a new internal transaction on a retryable session.
+
+ // A transaction on a child session is always begun on an operation that checked it out
+ // directly.
+ dassert(lsid == checkedOutLsid);
+
+ checkedOutSession->observeNewClientTxnNumberStarted(*lsid.getTxnNumber());
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 937889c82cc..8186e2fcc60 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -152,7 +152,7 @@ private:
invariant(!getParentSessionId(lsid));
}
- Session* getSession(const LogicalSessionId& lsid);
+ Session* getSession(WithLock, const LogicalSessionId& lsid);
// Must only be accessed by the OperationContext which currently has this logical session
// checked out.
@@ -207,11 +207,13 @@ private:
SessionRuntimeInfo* _getOrCreateSessionRuntimeInfo(WithLock lk, const LogicalSessionId& lsid);
/**
- * Makes a session, previously checked out through 'checkoutSession', available again.
+ * Makes a session, previously checked out through 'checkoutSession', available again. Will free
+ * any retryable sessions with txnNumbers before clientTxnNumberStarted if it is set.
*/
void _releaseSession(SessionRuntimeInfo* sri,
Session* session,
- boost::optional<KillToken> killToken);
+ boost::optional<KillToken> killToken,
+ boost::optional<TxnNumber> clientTxnNumberStarted);
// Protects the state below
mutable Mutex _mutex =
@@ -239,6 +241,7 @@ public:
ScopedCheckedOutSession(ScopedCheckedOutSession&& other)
: _catalog(other._catalog),
+ _clientTxnNumberStarted(other._clientTxnNumberStarted),
_sri(other._sri),
_session(other._session),
_killToken(std::move(other._killToken)) {
@@ -251,7 +254,8 @@ public:
~ScopedCheckedOutSession() {
if (_sri) {
- _catalog._releaseSession(_sri, _session, std::move(_killToken));
+ _catalog._releaseSession(
+ _sri, _session, std::move(_killToken), _clientTxnNumberStarted);
}
}
@@ -275,10 +279,20 @@ public:
return bool(_killToken);
}
+ void observeNewClientTxnNumberStarted(TxnNumber txnNumber) {
+ _clientTxnNumberStarted = txnNumber;
+ }
+
private:
// The owning session catalog into which the session should be checked back
SessionCatalog& _catalog;
+ // If this session began a retryable write or transaction while checked out, this is set to the
+ // "client txnNumber" of that transaction, which is the top-level txnNumber for a retryable
+ // write or transaction sent by a client or the txnNumber in the sessionId for a retryable
+ // child transaction.
+ boost::optional<TxnNumber> _clientTxnNumberStarted;
+
SessionCatalog::SessionRuntimeInfo* _sri;
Session* _session;
boost::optional<SessionCatalog::KillToken> _killToken;
@@ -489,6 +503,14 @@ public:
static void checkIn(OperationContext* opCtx, CheckInReason reason);
static void checkOut(OperationContext* opCtx);
+ /**
+ * Notifies the session catalog when a new transaction/retryable write is begun on the operation
+ * context's checked out session.
+ */
+ static void observeNewTxnNumberStarted(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber);
+
private:
OperationContext* const _opCtx;
};
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 6aba931a08b..ba117c17afc 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -2732,6 +2732,14 @@ void TransactionParticipant::Participant::_setNewTxnNumberAndRetryCounter(
// Reset the transactional state
_resetTransactionStateAndUnlock(&lk, TransactionState::kNone);
+
+ invariant(!lk);
+ if (isParentSessionId(_sessionId())) {
+ // Only observe parent sessions because retryable transactions begin the same txnNumber on
+ // their parent session.
+ OperationContextSession::observeNewTxnNumberStarted(
+ opCtx, _sessionId(), txnNumberAndRetryCounter.getTxnNumber());
+ }
}
void RetryableWriteTransactionParticipantCatalog::addParticipant(
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 65c6419b370..a7f2ce3a27b 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/transaction_participant.h"
@@ -351,6 +352,33 @@ protected:
return opCtxSession;
}
+ void checkOutSessionFromDiferentOpCtx(const LogicalSessionId& lsid,
+ bool beginOrContinueTxn,
+ boost::optional<TxnNumber> txnNumber = boost::none,
+ boost::optional<bool> autocommit = boost::none,
+ boost::optional<bool> startTransaction = boost::none,
+ bool commitTxn = false) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ if (txnNumber) {
+ opCtx->setTxnNumber(*txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ }
+
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+
+ if (beginOrContinueTxn) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, autocommit, startTransaction);
+
+ if (commitTxn) {
+ txnParticipant.commitUnpreparedTransaction(opCtx);
+ }
+ }
+ });
+ }
+
const LogicalSessionId _sessionId{makeLogicalSessionIdForTest()};
const TxnNumber _txnNumber{20};
const UUID _uuid = UUID::gen();
@@ -1663,6 +1691,30 @@ protected:
TxnParticipantTest::tearDown();
}
+ void runRetryableWrite(LogicalSessionId lsid, TxnNumber txnNumber) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNumber);
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ TransactionParticipant::get(opCtx).beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, boost::none, boost::none);
+ });
+ }
+
+ void runAndCommitTransaction(LogicalSessionId lsid, TxnNumber txnNumber) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(opCtx, {*opCtx->getTxnNumber()}, false, true);
+ txnParticipant.unstashTransactionResources(opCtx, "find");
+ txnParticipant.commitUnpreparedTransaction(opCtx);
+ });
+ }
+
RAIIServerParameterControllerForTest _controller{"featureFlagInternalTransactions", true};
};
@@ -5433,5 +5485,397 @@ TEST_F(TxnParticipantTest,
}
}
+bool doesExistInCatalog(const LogicalSessionId& lsid, SessionCatalog* sessionCatalog) {
+ bool existsInCatalog{false};
+ sessionCatalog->scanSession(lsid,
+ [&](const ObservableSession& session) { existsInCatalog = true; });
+ return existsInCatalog;
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapRetryableSessionsUponNewClientTransaction) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber client transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ runAndCommitTransaction(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapRetryableSessionsUponNewClientRetryableWrite) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber retryable write and verify the child was erased.
+
+ parentTxnNumber++;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapRetryableSessionsUponNewRetryableTransaction) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber retryable transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ auto higherRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(higherRetryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog));
+}
+
+TEST_F(
+ ShardTxnParticipantTest,
+ EagerlyReapRetryableSessionsOnlyUponNewTransactionBegunAndIgnoresNonRetryableAndUnrelatedSessions) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with two retryable children and one non-retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid1 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid1, 0);
+
+ auto retryableChildLsid2 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid2, 0);
+
+ auto nonRetryableChildLsid = makeLogicalSessionIdWithTxnUUIDForTest(parentLsid);
+ runAndCommitTransaction(nonRetryableChildLsid, 0);
+
+ // Add entries for unrelated sessions to verify they aren't affected.
+
+ auto parentLsidOther = makeLogicalSessionIdForTest();
+ runRetryableWrite(parentLsidOther, 0);
+
+ auto retryableChildLsidOther =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsidOther, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsidOther, 0);
+
+ auto nonRetryableChildLsidOther =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsidOther, parentTxnNumber);
+ runAndCommitTransaction(nonRetryableChildLsidOther, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 2);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT(doesExistInCatalog(parentLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsidOther, sessionCatalog));
+
+ // Check out with a higher txnNumber and verify we don't reap until a transaction has begun on
+ // it.
+
+ parentTxnNumber++;
+
+ // Does not call beginOrContinue.
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(parentLsid);
+ opCtx->setTxnNumber(parentTxnNumber);
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ });
+ // Does not call beginOrContinue.
+ auto higherRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(higherRetryableChildLsid);
+ opCtx->setTxnNumber(0);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ });
+ // beginOrContinue fails because no startTransaction=true.
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(parentLsid);
+ opCtx->setTxnNumber(parentTxnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ ASSERT_THROWS_CODE(
+ txnParticipant.beginOrContinue(opCtx, {*opCtx->getTxnNumber()}, false, boost::none),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+ });
+ // Non-retryable child sessions shouldn't affect retryable sessions.
+ auto newNonRetryableChildLsid = makeLogicalSessionIdWithTxnUUIDForTest(parentLsid);
+ runAndCommitTransaction(newNonRetryableChildLsid, 0);
+
+ // No sessions should have been reaped.
+ ASSERT_EQ(sessionCatalog->size(), 2);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT(doesExistInCatalog(parentLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsidOther, sessionCatalog));
+
+ // Call beginOrContinue for a higher txnNumber and verify we do erase old sessions for the
+ // active session.
+
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ // The two retryable children for parentLsid should have been reaped.
+ ASSERT_EQ(sessionCatalog->size(), 2);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT(doesExistInCatalog(parentLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsidOther, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapLowerTxnNumbers) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with two retryable children.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 1;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid1 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid1, 0);
+
+ auto retryableChildLsid2 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid2, 0);
+
+ auto lowerRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber - 1);
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lowerRetryableChildLsid);
+ opCtx->setTxnNumber(0);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ });
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT(doesExistInCatalog(lowerRetryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber retryable transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(lowerRetryableChildLsid, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapSkipsHigherUnusedTxnNumbers) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 1;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Check out a higher txnNumber retryable transaction but do not start it and verify it does not
+ // reap and is not reaped.
+
+ auto higherUnusedRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber + 10);
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(higherUnusedRetryableChildLsid);
+ opCtx->setTxnNumber(0);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ });
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherUnusedRetryableChildLsid, sessionCatalog));
+
+ parentTxnNumber++; // Still less than in higherUnusedRetryableChildLsid.
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherUnusedRetryableChildLsid, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapSkipsKilledSessions) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with two retryable children.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 1;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid1 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid1, 0);
+
+ auto retryableChildLsid2 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid2, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+
+ // Kill one retryable child and verify no sessions in its SRI can be reaped until it has been
+ // checked out by its killer.
+
+ parentTxnNumber++;
+ boost::optional<SessionCatalog::KillToken> killToken;
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(parentLsid);
+ opCtx->setTxnNumber(parentTxnNumber);
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+
+ TransactionParticipant::get(opCtx).beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, boost::none, boost::none);
+
+ // Kill after checking out the session because we can't check out the session again
+ // after a kill without checking out with the killToken first.
+ killToken = sessionCatalog->killSession(retryableChildLsid1);
+ });
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+
+ // Check out for kill the killed retryable session and now both retryable sessions can be
+ // reaped.
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ sessionCatalog->checkOutSessionForKill(opCtx, std::move(*killToken));
+ });
+
+ // A new client txnNumber must be seen to trigger the reaping, so the sessions shouldn't have
+ // been reaped upon releasing the killed session.
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+
+ parentTxnNumber++;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, CheckingOutEagerlyReapedSessionDoesNotCrash) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber client transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ runAndCommitTransaction(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Check out the child again to verify this doesn't crash.
+ ASSERT_THROWS_CODE(runAndCommitTransaction(retryableChildLsid, 1),
+ AssertionException,
+ ErrorCodes::TransactionTooOld);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index e81077d32d5..db0cc68ca61 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -1434,34 +1434,39 @@ void TransactionRouter::Router::appendRecoveryToken(BSONObjBuilder* builder) con
void TransactionRouter::Router::_resetRouterState(
OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "Cannot start a new transaction while the previous is yielded",
- o(lk).activeYields == 0);
-
- o(lk).txnNumberAndRetryCounter.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber());
- o(lk).txnNumberAndRetryCounter.setTxnRetryCounter(
- *txnNumberAndRetryCounter.getTxnRetryCounter());
- o(lk).commitType = CommitType::kNotInitiated;
- p().isRecoveringCommit = false;
- o(lk).participants.clear();
- o(lk).coordinatorId.reset();
- p().recoveryShardId.reset();
- o(lk).apiParameters = {};
- o(lk).readConcernArgs = {};
- o(lk).atClusterTime.reset();
- o(lk).abortCause = std::string();
- o(lk).metricsTracker.emplace(opCtx->getServiceContext());
- p().terminationInitiated = false;
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Cannot start a new transaction while the previous is yielded",
+ o(lk).activeYields == 0);
- auto tickSource = opCtx->getServiceContext()->getTickSource();
- o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks());
+ o(lk).txnNumberAndRetryCounter.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber());
+ o(lk).txnNumberAndRetryCounter.setTxnRetryCounter(
+ *txnNumberAndRetryCounter.getTxnRetryCounter());
+ o(lk).commitType = CommitType::kNotInitiated;
+ p().isRecoveringCommit = false;
+ o(lk).participants.clear();
+ o(lk).coordinatorId.reset();
+ p().recoveryShardId.reset();
+ o(lk).apiParameters = {};
+ o(lk).readConcernArgs = {};
+ o(lk).atClusterTime.reset();
+ o(lk).abortCause = std::string();
+ o(lk).metricsTracker.emplace(opCtx->getServiceContext());
+ p().terminationInitiated = false;
+
+ auto tickSource = opCtx->getServiceContext()->getTickSource();
+ o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks());
+
+ // TODO SERVER-37115: Parse statement ids from the client and remember the statement id
+ // of the command that started the transaction, if one was included.
+ p().latestStmtId = kDefaultFirstStmtId;
+ p().firstStmtId = kDefaultFirstStmtId;
+ }
- // TODO SERVER-37115: Parse statement ids from the client and remember the statement id
- // of the command that started the transaction, if one was included.
- p().latestStmtId = kDefaultFirstStmtId;
- p().firstStmtId = kDefaultFirstStmtId;
+ OperationContextSession::observeNewTxnNumberStarted(
+ opCtx, _sessionId(), txnNumberAndRetryCounter.getTxnNumber());
};
void TransactionRouter::Router::_resetRouterStateForStartTransaction(
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index 94a57f1ac2d..1191e92d11e 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -198,6 +198,26 @@ protected:
});
}
+ void runFunctionFromDifferentOpCtx(std::function<void(OperationContext*)> func) {
+ auto newClientOwned = getServiceContext()->makeClient("newClient");
+ AlternativeClientRegion acr(newClientOwned);
+ auto newOpCtx = cc().makeOperationContext();
+ func(newOpCtx.get());
+ }
+
+ void runTransactionLeaveOpen(LogicalSessionId lsid, TxnNumber txnNumber) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<RouterOperationContextSession>(opCtx);
+
+ auto txnRouter = TransactionRouter::get(opCtx);
+ txnRouter.beginOrContinueTxn(
+ opCtx, *opCtx->getTxnNumber(), TransactionRouter::TransactionActions::kStart);
+ });
+ }
+
private:
// Enables the transaction router to retry within a transaction on stale version and snapshot
// errors for the duration of each test.
@@ -5300,5 +5320,72 @@ TEST_F(TransactionRouterMetricsTest, IsTrackingOverIfTxnImplicitlyAborted) {
implicitAbortInProgress();
ASSERT(txnRouter().isTrackingOver());
}
+
+bool doesExistInCatalog(const LogicalSessionId& lsid, SessionCatalog* sessionCatalog) {
+ bool existsInCatalog{false};
+ sessionCatalog->scanSession(lsid,
+ [&](const ObservableSession& session) { existsInCatalog = true; });
+ return existsInCatalog;
+}
+
+TEST_F(TransactionRouterTest, EagerlyReapRetryableSessionsUponNewClientTransaction) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runTransactionLeaveOpen(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runTransactionLeaveOpen(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber client transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ runTransactionLeaveOpen(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+}
+
+TEST_F(TransactionRouterTest, EagerlyReapRetryableSessionsUponNewRetryableTransaction) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runTransactionLeaveOpen(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runTransactionLeaveOpen(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber retryable transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ auto higherRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runTransactionLeaveOpen(higherRetryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog));
+}
+
} // unnamed namespace
} // namespace mongo