summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJack Mulrow <jack.mulrow@mongodb.com>2022-05-31 18:51:03 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-06-02 22:31:59 +0000
commit8e96ecbf56a0763f4300aa38746ac86a8335d264 (patch)
treeea895c72b98015ac61ed0e4c56711c1bb83a2424
parent479b25e2af6cc20b8a9cb81f62f2864a13da446f (diff)
downloadmongo-8e96ecbf56a0763f4300aa38746ac86a8335d264.tar.gz
SERVER-66852 Eagerly erase retryable child sessions from SessionCatalog
-rw-r--r--src/mongo/db/logical_session_id_helpers.cpp5
-rw-r--r--src/mongo/db/logical_session_id_helpers.h6
-rw-r--r--src/mongo/db/session_catalog.cpp68
-rw-r--r--src/mongo/db/session_catalog.h30
-rw-r--r--src/mongo/db/transaction_participant.cpp8
-rw-r--r--src/mongo/db/transaction_participant_test.cpp444
-rw-r--r--src/mongo/s/transaction_router.cpp55
-rw-r--r--src/mongo/s/transaction_router_test.cpp87
8 files changed, 667 insertions, 36 deletions
diff --git a/src/mongo/db/logical_session_id_helpers.cpp b/src/mongo/db/logical_session_id_helpers.cpp
index 035b54b061d..c780813e68a 100644
--- a/src/mongo/db/logical_session_id_helpers.cpp
+++ b/src/mongo/db/logical_session_id_helpers.cpp
@@ -84,6 +84,11 @@ SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db) {
return SHA256Block::computeHash({ConstDataRange(fn.c_str(), fn.size())});
}
+bool isParentSessionId(const LogicalSessionId& sessionId) {
+ // All child sessions must have a txnUUID.
+ return !sessionId.getTxnUUID();
+}
+
boost::optional<LogicalSessionId> getParentSessionId(const LogicalSessionId& sessionId) {
if (sessionId.getTxnUUID()) {
return LogicalSessionId{sessionId.getId(), sessionId.getUid()};
diff --git a/src/mongo/db/logical_session_id_helpers.h b/src/mongo/db/logical_session_id_helpers.h
index 631ca0d979a..3d692c701e5 100644
--- a/src/mongo/db/logical_session_id_helpers.h
+++ b/src/mongo/db/logical_session_id_helpers.h
@@ -50,6 +50,12 @@ SHA256Block getLogicalSessionUserDigestForLoggedInUser(const OperationContext* o
SHA256Block getLogicalSessionUserDigestFor(StringData user, StringData db);
/**
+ * Returns if the given session is a parent session, ie only has fields that could have come from an
+ * external client.
+ */
+bool isParentSessionId(const LogicalSessionId& sessionId);
+
+/**
* Returns the parent session id for the given session id if there is one.
*/
boost::optional<LogicalSessionId> getParentSessionId(const LogicalSessionId& sessionId);
diff --git a/src/mongo/db/session_catalog.cpp b/src/mongo/db/session_catalog.cpp
index a0c97676370..7760427aa9a 100644
--- a/src/mongo/db/session_catalog.cpp
+++ b/src/mongo/db/session_catalog.cpp
@@ -89,7 +89,7 @@ SessionCatalog::ScopedCheckedOutSession SessionCatalog::_checkOutSessionInner(
stdx::unique_lock<Latch> ul(_mutex);
auto sri = _getOrCreateSessionRuntimeInfo(ul, lsid);
- auto session = sri->getSession(lsid);
+ auto session = sri->getSession(ul, lsid);
invariant(session);
if (killToken) {
@@ -148,7 +148,7 @@ void SessionCatalog::scanSession(const LogicalSessionId& lsid,
stdx::lock_guard<Latch> lg(_mutex);
if (auto sri = _getSessionRuntimeInfo(lg, lsid)) {
- auto session = sri->getSession(lsid);
+ auto session = sri->getSession(lg, lsid);
invariant(session);
ObservableSession osession(lg, sri, session);
@@ -259,7 +259,7 @@ SessionCatalog::KillToken SessionCatalog::killSession(const LogicalSessionId& ls
auto sri = _getSessionRuntimeInfo(lg, lsid);
uassert(ErrorCodes::NoSuchSession, "Session not found", sri);
- auto session = sri->getSession(lsid);
+ auto session = sri->getSession(lg, lsid);
uassert(ErrorCodes::NoSuchSession, "Session not found", session);
return ObservableSession(lg, sri, session).kill();
}
@@ -275,7 +275,7 @@ void SessionCatalog::createSessionIfDoesNotExist(const LogicalSessionId& lsid) {
}
SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo(
- WithLock, const LogicalSessionId& lsid) {
+ WithLock wl, const LogicalSessionId& lsid) {
auto parentLsid = castToParentSessionId(lsid);
auto sriIt = _sessions.find(parentLsid);
@@ -284,7 +284,7 @@ SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getSessionRuntimeInfo(
}
auto sri = sriIt->second.get();
- auto session = sri->getSession(lsid);
+ auto session = sri->getSession(wl, lsid);
if (session) {
return sri;
@@ -322,7 +322,8 @@ SessionCatalog::SessionRuntimeInfo* SessionCatalog::_getOrCreateSessionRuntimeIn
void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
Session* session,
- boost::optional<KillToken> killToken) {
+ boost::optional<KillToken> killToken,
+ boost::optional<TxnNumber> clientTxnNumberStarted) {
stdx::lock_guard<Latch> lg(_mutex);
// Make sure we have exactly the same session on the map and that it is still associated with an
@@ -341,9 +342,30 @@ void SessionCatalog::_releaseSession(SessionRuntimeInfo* sri,
invariant(sri->killsRequested > 0);
--sri->killsRequested;
}
+
+ if (clientTxnNumberStarted.has_value()) {
+ // Since the given txnNumber successfully started, we know any child sessions with older
+ // txnNumbers can be discarded. This needed to wait until a transaction started because that
+ // can fail, e.g. if the active transaction is prepared.
+ auto numReaped = stdx::erase_if(sri->childSessions, [&](auto&& it) {
+ ObservableSession osession(lg, sri, &it.second);
+ if (it.first.getTxnNumber() && *it.first.getTxnNumber() < *clientTxnNumberStarted) {
+ osession.markForReap(ObservableSession::ReapMode::kExclusive);
+ }
+ return osession._shouldBeReaped();
+ });
+
+ LOGV2_DEBUG(6685200,
+ 4,
+ "Erased child sessions",
+ "releasedLsid"_attr = session->getSessionId(),
+ "clientTxnNumber"_attr = *clientTxnNumberStarted,
+ "childSessionsRemaining"_attr = sri->childSessions.size(),
+ "numReaped"_attr = numReaped);
+ }
}
-Session* SessionCatalog::SessionRuntimeInfo::getSession(const LogicalSessionId& lsid) {
+Session* SessionCatalog::SessionRuntimeInfo::getSession(WithLock, const LogicalSessionId& lsid) {
if (lsid == parentSession._sessionId) {
return &parentSession;
}
@@ -488,4 +510,36 @@ void OperationContextSession::checkOut(OperationContext* opCtx) {
checkedOutSession.emplace(std::move(scopedCheckedOutSession));
}
+void OperationContextSession::observeNewTxnNumberStarted(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber) {
+ auto& checkedOutSession = operationSessionDecoration(opCtx);
+ invariant(checkedOutSession);
+
+ LOGV2_DEBUG(6685201,
+ 4,
+ "Observing new retryable write number started on session",
+ "lsid"_attr = lsid,
+ "txnNumber"_attr = txnNumber);
+
+ const auto& checkedOutLsid = (*checkedOutSession)->getSessionId();
+ if (isParentSessionId(lsid)) {
+ // Observing a new transaction/retryable write on a parent session.
+
+ // The operation must have checked out the parent session itself or a child session of the
+ // parent. This is safe because both share the same SessionRuntimeInfo.
+ dassert(lsid == checkedOutLsid || lsid == *getParentSessionId(checkedOutLsid));
+
+ checkedOutSession->observeNewClientTxnNumberStarted(txnNumber);
+ } else if (isInternalSessionForRetryableWrite(lsid)) {
+ // Observing a new internal transaction on a retryable session.
+
+ // A transaction on a child session is always begun on an operation that checked it out
+ // directly.
+ dassert(lsid == checkedOutLsid);
+
+ checkedOutSession->observeNewClientTxnNumberStarted(*lsid.getTxnNumber());
+ }
+}
+
} // namespace mongo
diff --git a/src/mongo/db/session_catalog.h b/src/mongo/db/session_catalog.h
index 937889c82cc..8186e2fcc60 100644
--- a/src/mongo/db/session_catalog.h
+++ b/src/mongo/db/session_catalog.h
@@ -152,7 +152,7 @@ private:
invariant(!getParentSessionId(lsid));
}
- Session* getSession(const LogicalSessionId& lsid);
+ Session* getSession(WithLock, const LogicalSessionId& lsid);
// Must only be accessed by the OperationContext which currently has this logical session
// checked out.
@@ -207,11 +207,13 @@ private:
SessionRuntimeInfo* _getOrCreateSessionRuntimeInfo(WithLock lk, const LogicalSessionId& lsid);
/**
- * Makes a session, previously checked out through 'checkoutSession', available again.
+ * Makes a session, previously checked out through 'checkoutSession', available again. Will free
+ * any retryable sessions with txnNumbers before clientTxnNumberStarted if it is set.
*/
void _releaseSession(SessionRuntimeInfo* sri,
Session* session,
- boost::optional<KillToken> killToken);
+ boost::optional<KillToken> killToken,
+ boost::optional<TxnNumber> clientTxnNumberStarted);
// Protects the state below
mutable Mutex _mutex =
@@ -239,6 +241,7 @@ public:
ScopedCheckedOutSession(ScopedCheckedOutSession&& other)
: _catalog(other._catalog),
+ _clientTxnNumberStarted(other._clientTxnNumberStarted),
_sri(other._sri),
_session(other._session),
_killToken(std::move(other._killToken)) {
@@ -251,7 +254,8 @@ public:
~ScopedCheckedOutSession() {
if (_sri) {
- _catalog._releaseSession(_sri, _session, std::move(_killToken));
+ _catalog._releaseSession(
+ _sri, _session, std::move(_killToken), _clientTxnNumberStarted);
}
}
@@ -275,10 +279,20 @@ public:
return bool(_killToken);
}
+ void observeNewClientTxnNumberStarted(TxnNumber txnNumber) {
+ _clientTxnNumberStarted = txnNumber;
+ }
+
private:
// The owning session catalog into which the session should be checked back
SessionCatalog& _catalog;
+ // If this session began a retryable write or transaction while checked out, this is set to the
+ // "client txnNumber" of that transaction, which is the top-level txnNumber for a retryable
+ // write or transaction sent by a client or the txnNumber in the sessionId for a retryable
+ // child transaction.
+ boost::optional<TxnNumber> _clientTxnNumberStarted;
+
SessionCatalog::SessionRuntimeInfo* _sri;
Session* _session;
boost::optional<SessionCatalog::KillToken> _killToken;
@@ -489,6 +503,14 @@ public:
static void checkIn(OperationContext* opCtx, CheckInReason reason);
static void checkOut(OperationContext* opCtx);
+ /**
+ * Notifies the session catalog when a new transaction/retryable write is begun on the operation
+ * context's checked out session.
+ */
+ static void observeNewTxnNumberStarted(OperationContext* opCtx,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber);
+
private:
OperationContext* const _opCtx;
};
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 6944763d31b..0a8495b859c 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -2733,6 +2733,14 @@ void TransactionParticipant::Participant::_setNewTxnNumberAndRetryCounter(
// Reset the transactional state
_resetTransactionStateAndUnlock(&lk, TransactionState::kNone);
+
+ invariant(!lk);
+ if (isParentSessionId(_sessionId())) {
+ // Only observe parent sessions because retryable transactions begin the same txnNumber on
+ // their parent session.
+ OperationContextSession::observeNewTxnNumberStarted(
+ opCtx, _sessionId(), txnNumberAndRetryCounter.getTxnNumber());
+ }
}
void RetryableWriteTransactionParticipantCatalog::addParticipant(
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 37cca9a0099..736020b0bf1 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -47,6 +47,7 @@
#include "mongo/db/repl/storage_interface_mock.h"
#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/service_context.h"
+#include "mongo/db/session_catalog.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/transaction_participant.h"
@@ -350,6 +351,33 @@ protected:
return opCtxSession;
}
+ void checkOutSessionFromDiferentOpCtx(const LogicalSessionId& lsid,
+ bool beginOrContinueTxn,
+ boost::optional<TxnNumber> txnNumber = boost::none,
+ boost::optional<bool> autocommit = boost::none,
+ boost::optional<bool> startTransaction = boost::none,
+ bool commitTxn = false) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ if (txnNumber) {
+ opCtx->setTxnNumber(*txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ }
+
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+
+ if (beginOrContinueTxn) {
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, autocommit, startTransaction);
+
+ if (commitTxn) {
+ txnParticipant.commitUnpreparedTransaction(opCtx);
+ }
+ }
+ });
+ }
+
const LogicalSessionId _sessionId{makeLogicalSessionIdForTest()};
const TxnNumber _txnNumber{20};
const UUID _uuid = UUID::gen();
@@ -1662,6 +1690,30 @@ protected:
TxnParticipantTest::tearDown();
}
+ void runRetryableWrite(LogicalSessionId lsid, TxnNumber txnNumber) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNumber);
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ TransactionParticipant::get(opCtx).beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, boost::none, boost::none);
+ });
+ }
+
+ void runAndCommitTransaction(LogicalSessionId lsid, TxnNumber txnNumber) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ txnParticipant.beginOrContinue(opCtx, {*opCtx->getTxnNumber()}, false, true);
+ txnParticipant.unstashTransactionResources(opCtx, "find");
+ txnParticipant.commitUnpreparedTransaction(opCtx);
+ });
+ }
+
RAIIServerParameterControllerForTest _controller{"featureFlagInternalTransactions", true};
};
@@ -5432,5 +5484,397 @@ TEST_F(TxnParticipantTest,
}
}
+bool doesExistInCatalog(const LogicalSessionId& lsid, SessionCatalog* sessionCatalog) {
+ bool existsInCatalog{false};
+ sessionCatalog->scanSession(lsid,
+ [&](const ObservableSession& session) { existsInCatalog = true; });
+ return existsInCatalog;
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapRetryableSessionsUponNewClientTransaction) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber client transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ runAndCommitTransaction(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapRetryableSessionsUponNewClientRetryableWrite) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber retryable write and verify the child was erased.
+
+ parentTxnNumber++;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapRetryableSessionsUponNewRetryableTransaction) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber retryable transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ auto higherRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(higherRetryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog));
+}
+
+TEST_F(
+ ShardTxnParticipantTest,
+ EagerlyReapRetryableSessionsOnlyUponNewTransactionBegunAndIgnoresNonRetryableAndUnrelatedSessions) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with two retryable children and one non-retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid1 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid1, 0);
+
+ auto retryableChildLsid2 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid2, 0);
+
+ auto nonRetryableChildLsid = makeLogicalSessionIdWithTxnUUIDForTest(parentLsid);
+ runAndCommitTransaction(nonRetryableChildLsid, 0);
+
+ // Add entries for unrelated sessions to verify they aren't affected.
+
+ auto parentLsidOther = makeLogicalSessionIdForTest();
+ runRetryableWrite(parentLsidOther, 0);
+
+ auto retryableChildLsidOther =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsidOther, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsidOther, 0);
+
+ auto nonRetryableChildLsidOther =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsidOther, parentTxnNumber);
+ runAndCommitTransaction(nonRetryableChildLsidOther, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 2);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT(doesExistInCatalog(parentLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsidOther, sessionCatalog));
+
+ // Check out with a higher txnNumber and verify we don't reap until a transaction has begun on
+ // it.
+
+ parentTxnNumber++;
+
+ // Does not call beginOrContinue.
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(parentLsid);
+ opCtx->setTxnNumber(parentTxnNumber);
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ });
+ // Does not call beginOrContinue.
+ auto higherRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(higherRetryableChildLsid);
+ opCtx->setTxnNumber(0);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ });
+ // beginOrContinue fails because no startTransaction=true.
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(parentLsid);
+ opCtx->setTxnNumber(parentTxnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+
+ auto txnParticipant = TransactionParticipant::get(opCtx);
+ ASSERT_THROWS_CODE(
+ txnParticipant.beginOrContinue(opCtx, {*opCtx->getTxnNumber()}, false, boost::none),
+ AssertionException,
+ ErrorCodes::NoSuchTransaction);
+ });
+ // Non-retryable child sessions shouldn't affect retryable sessions.
+ auto newNonRetryableChildLsid = makeLogicalSessionIdWithTxnUUIDForTest(parentLsid);
+ runAndCommitTransaction(newNonRetryableChildLsid, 0);
+
+ // No sessions should have been reaped.
+ ASSERT_EQ(sessionCatalog->size(), 2);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT(doesExistInCatalog(parentLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsidOther, sessionCatalog));
+
+ // Call beginOrContinue for a higher txnNumber and verify we do erase old sessions for the
+ // active session.
+
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ // The two retryable children for parentLsid should have been reaped.
+ ASSERT_EQ(sessionCatalog->size(), 2);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT(doesExistInCatalog(parentLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsidOther, sessionCatalog));
+ ASSERT(doesExistInCatalog(nonRetryableChildLsidOther, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapLowerTxnNumbers) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with two retryable children.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 1;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid1 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid1, 0);
+
+ auto retryableChildLsid2 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid2, 0);
+
+ auto lowerRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber - 1);
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lowerRetryableChildLsid);
+ opCtx->setTxnNumber(0);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ });
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT(doesExistInCatalog(lowerRetryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber retryable transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(lowerRetryableChildLsid, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapSkipsHigherUnusedTxnNumbers) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 1;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Check out a higher txnNumber retryable transaction but do not start it and verify it does not
+ // reap and is not reaped.
+
+ auto higherUnusedRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber + 10);
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(higherUnusedRetryableChildLsid);
+ opCtx->setTxnNumber(0);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+ });
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherUnusedRetryableChildLsid, sessionCatalog));
+
+ parentTxnNumber++; // Still less than in higherUnusedRetryableChildLsid.
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherUnusedRetryableChildLsid, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, EagerlyReapSkipsKilledSessions) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with two retryable children.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 1;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid1 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid1, 0);
+
+ auto retryableChildLsid2 =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid2, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+
+ // Kill one retryable child and verify no sessions in its SRI can be reaped until it has been
+ // checked out by its killer.
+
+ parentTxnNumber++;
+ boost::optional<SessionCatalog::KillToken> killToken;
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(parentLsid);
+ opCtx->setTxnNumber(parentTxnNumber);
+ auto opCtxSession = std::make_unique<MongoDOperationContextSession>(opCtx);
+
+ TransactionParticipant::get(opCtx).beginOrContinue(
+ opCtx, {*opCtx->getTxnNumber()}, boost::none, boost::none);
+
+ // Kill after checking out the session because we can't check out the session again
+ // after a kill without checking out with the killToken first.
+ killToken = sessionCatalog->killSession(retryableChildLsid1);
+ });
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+
+ // Check out for kill the killed retryable session and now both retryable sessions can be
+ // reaped.
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ sessionCatalog->checkOutSessionForKill(opCtx, std::move(*killToken));
+ });
+
+ // A new client txnNumber must be seen to trigger the reaping, so the sessions shouldn't have
+ // been reaped upon releasing the killed session.
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+
+ parentTxnNumber++;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid1, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid2, sessionCatalog));
+}
+
+TEST_F(ShardTxnParticipantTest, CheckingOutEagerlyReapedSessionDoesNotCrash) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runRetryableWrite(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runAndCommitTransaction(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber client transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ runAndCommitTransaction(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Check out the child again to verify this doesn't crash.
+ ASSERT_THROWS_CODE(runAndCommitTransaction(retryableChildLsid, 1),
+ AssertionException,
+ ErrorCodes::TransactionTooOld);
+}
+
} // namespace
} // namespace mongo
diff --git a/src/mongo/s/transaction_router.cpp b/src/mongo/s/transaction_router.cpp
index 955fb56861e..1368e3dc342 100644
--- a/src/mongo/s/transaction_router.cpp
+++ b/src/mongo/s/transaction_router.cpp
@@ -1436,34 +1436,39 @@ void TransactionRouter::Router::appendRecoveryToken(BSONObjBuilder* builder) con
void TransactionRouter::Router::_resetRouterState(
OperationContext* opCtx, const TxnNumberAndRetryCounter& txnNumberAndRetryCounter) {
- stdx::lock_guard<Client> lk(*opCtx->getClient());
+ {
+ stdx::lock_guard<Client> lk(*opCtx->getClient());
- uassert(ErrorCodes::ConflictingOperationInProgress,
- "Cannot start a new transaction while the previous is yielded",
- o(lk).activeYields == 0);
-
- o(lk).txnNumberAndRetryCounter.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber());
- o(lk).txnNumberAndRetryCounter.setTxnRetryCounter(
- *txnNumberAndRetryCounter.getTxnRetryCounter());
- o(lk).commitType = CommitType::kNotInitiated;
- p().isRecoveringCommit = false;
- o(lk).participants.clear();
- o(lk).coordinatorId.reset();
- p().recoveryShardId.reset();
- o(lk).apiParameters = {};
- o(lk).readConcernArgs = {};
- o(lk).atClusterTime.reset();
- o(lk).abortCause = std::string();
- o(lk).metricsTracker.emplace(opCtx->getServiceContext());
- p().terminationInitiated = false;
+ uassert(ErrorCodes::ConflictingOperationInProgress,
+ "Cannot start a new transaction while the previous is yielded",
+ o(lk).activeYields == 0);
- auto tickSource = opCtx->getServiceContext()->getTickSource();
- o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks());
+ o(lk).txnNumberAndRetryCounter.setTxnNumber(txnNumberAndRetryCounter.getTxnNumber());
+ o(lk).txnNumberAndRetryCounter.setTxnRetryCounter(
+ *txnNumberAndRetryCounter.getTxnRetryCounter());
+ o(lk).commitType = CommitType::kNotInitiated;
+ p().isRecoveringCommit = false;
+ o(lk).participants.clear();
+ o(lk).coordinatorId.reset();
+ p().recoveryShardId.reset();
+ o(lk).apiParameters = {};
+ o(lk).readConcernArgs = {};
+ o(lk).atClusterTime.reset();
+ o(lk).abortCause = std::string();
+ o(lk).metricsTracker.emplace(opCtx->getServiceContext());
+ p().terminationInitiated = false;
+
+ auto tickSource = opCtx->getServiceContext()->getTickSource();
+ o(lk).metricsTracker->trySetActive(tickSource, tickSource->getTicks());
+
+ // TODO SERVER-37115: Parse statement ids from the client and remember the statement id
+ // of the command that started the transaction, if one was included.
+ p().latestStmtId = kDefaultFirstStmtId;
+ p().firstStmtId = kDefaultFirstStmtId;
+ }
- // TODO SERVER-37115: Parse statement ids from the client and remember the statement id
- // of the command that started the transaction, if one was included.
- p().latestStmtId = kDefaultFirstStmtId;
- p().firstStmtId = kDefaultFirstStmtId;
+ OperationContextSession::observeNewTxnNumberStarted(
+ opCtx, _sessionId(), txnNumberAndRetryCounter.getTxnNumber());
};
void TransactionRouter::Router::_resetRouterStateForStartTransaction(
diff --git a/src/mongo/s/transaction_router_test.cpp b/src/mongo/s/transaction_router_test.cpp
index addefae213c..ef933e911c1 100644
--- a/src/mongo/s/transaction_router_test.cpp
+++ b/src/mongo/s/transaction_router_test.cpp
@@ -200,6 +200,26 @@ protected:
});
}
+ void runFunctionFromDifferentOpCtx(std::function<void(OperationContext*)> func) {
+ auto newClientOwned = getServiceContext()->makeClient("newClient");
+ AlternativeClientRegion acr(newClientOwned);
+ auto newOpCtx = cc().makeOperationContext();
+ func(newOpCtx.get());
+ }
+
+ void runTransactionLeaveOpen(LogicalSessionId lsid, TxnNumber txnNumber) {
+ runFunctionFromDifferentOpCtx([&](OperationContext* opCtx) {
+ opCtx->setLogicalSessionId(lsid);
+ opCtx->setTxnNumber(txnNumber);
+ opCtx->setInMultiDocumentTransaction();
+ auto opCtxSession = std::make_unique<RouterOperationContextSession>(opCtx);
+
+ auto txnRouter = TransactionRouter::get(opCtx);
+ txnRouter.beginOrContinueTxn(
+ opCtx, *opCtx->getTxnNumber(), TransactionRouter::TransactionActions::kStart);
+ });
+ }
+
private:
// Enables the transaction router to retry within a transaction on stale version and snapshot
// errors for the duration of each test.
@@ -5302,5 +5322,72 @@ TEST_F(TransactionRouterMetricsTest, IsTrackingOverIfTxnImplicitlyAborted) {
implicitAbortInProgress();
ASSERT(txnRouter().isTrackingOver());
}
+
+bool doesExistInCatalog(const LogicalSessionId& lsid, SessionCatalog* sessionCatalog) {
+ bool existsInCatalog{false};
+ sessionCatalog->scanSession(lsid,
+ [&](const ObservableSession& session) { existsInCatalog = true; });
+ return existsInCatalog;
+}
+
+TEST_F(TransactionRouterTest, EagerlyReapRetryableSessionsUponNewClientTransaction) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runTransactionLeaveOpen(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runTransactionLeaveOpen(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber client transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ runTransactionLeaveOpen(parentLsid, parentTxnNumber);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+}
+
+TEST_F(TransactionRouterTest, EagerlyReapRetryableSessionsUponNewRetryableTransaction) {
+ auto sessionCatalog = SessionCatalog::get(getServiceContext());
+ ASSERT_EQ(sessionCatalog->size(), 0);
+
+ // Add a parent session with one retryable child.
+
+ auto parentLsid = makeLogicalSessionIdForTest();
+ auto parentTxnNumber = 0;
+ runTransactionLeaveOpen(parentLsid, parentTxnNumber);
+
+ auto retryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runTransactionLeaveOpen(retryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+
+ // Start a higher txnNumber retryable transaction and verify the child was erased.
+
+ parentTxnNumber++;
+ auto higherRetryableChildLsid =
+ makeLogicalSessionIdWithTxnNumberAndUUIDForTest(parentLsid, parentTxnNumber);
+ runTransactionLeaveOpen(higherRetryableChildLsid, 0);
+
+ ASSERT_EQ(sessionCatalog->size(), 1);
+ ASSERT(doesExistInCatalog(parentLsid, sessionCatalog));
+ ASSERT_FALSE(doesExistInCatalog(retryableChildLsid, sessionCatalog));
+ ASSERT(doesExistInCatalog(higherRetryableChildLsid, sessionCatalog));
+}
+
} // unnamed namespace
} // namespace mongo