summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_participant_retryable_writes_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_participant_retryable_writes_test.cpp')
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp170
1 files changed, 75 insertions, 95 deletions
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index 5914862a591..f3900a43b5f 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -145,15 +145,16 @@ protected:
MongoDSessionCatalog::onStepUp(opCtx());
const auto service = opCtx()->getServiceContext();
- OpObserverRegistry* opObserverRegistry =
- dynamic_cast<OpObserverRegistry*>(service->getOpObserver());
- auto mockObserver = stdx::make_unique<OpObserverMock>();
- _opObserver = mockObserver.get();
- opObserverRegistry->addObserver(std::move(mockObserver));
+
+ const auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver());
+ opObserverRegistry->addObserver(stdx::make_unique<OpObserverMock>());
+
+ opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
+ _opContextSession.emplace(opCtx());
}
void tearDown() final {
- _opObserver = nullptr;
+ _opContextSession.reset();
MockReplCoordServerFixture::tearDown();
}
@@ -196,16 +197,16 @@ protected:
OplogSlot());
}
- repl::OpTime writeTxnRecord(Session* session,
- TxnNumber txnNum,
+ repl::OpTime writeTxnRecord(TxnNumber txnNum,
StmtId stmtId,
repl::OpTime prevOpTime,
boost::optional<DurableTxnStateEnum> txnState) {
- const auto uuid = UUID::gen();
-
+ const auto session = OperationContextSession::get(opCtx());
const auto txnParticipant = TransactionParticipant::get(session);
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
+ const auto uuid = UUID::gen();
+
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime =
@@ -217,11 +218,12 @@ protected:
return opTime;
}
- void assertTxnRecord(Session* session,
- TxnNumber txnNum,
+ void assertTxnRecord(TxnNumber txnNum,
StmtId stmtId,
repl::OpTime opTime,
boost::optional<DurableTxnStateEnum> txnState) {
+ const auto session = OperationContextSession::get(opCtx());
+
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
{BSON("_id" << session->getSessionId().toBSON())});
@@ -243,23 +245,21 @@ protected:
ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
txnParticipant->invalidate();
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
}
- OpObserverMock* _opObserver = nullptr;
+private:
+ boost::optional<OperationContextSession> _opContextSession;
};
TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 20;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
-
- ASSERT_EQ(sessionId, session.getSessionId());
ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull());
DBDirectClient client(opCtx());
@@ -270,15 +270,14 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin)
}
TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrite) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 21;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
- const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none);
+ const auto opTime = writeTxnRecord(txnNum, 0, {}, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
@@ -298,13 +297,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit
TEST_F(TransactionParticipantRetryableWritesTest,
StartingNewerTransactionUpdatesThePersistedSession) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
- const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
- const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none);
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
+
+ const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none);
+ const auto secondOpTime = writeTxnRecord(200, 1, firstOpTime, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
@@ -322,33 +321,27 @@ TEST_F(TransactionParticipantRetryableWritesTest,
ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200));
txnParticipant->invalidate();
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200));
}
TEST_F(TransactionParticipantRetryableWritesTest, TransactionTableUpdatesReplaceEntireDocument) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
-
- const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
- assertTxnRecord(&session, 100, 0, firstOpTime, boost::none);
- const auto secondOpTime =
- writeTxnRecord(&session, 200, 1, firstOpTime, DurableTxnStateEnum::kPrepared);
- assertTxnRecord(&session, 200, 1, secondOpTime, DurableTxnStateEnum::kPrepared);
- const auto thirdOpTime =
- writeTxnRecord(&session, 300, 2, secondOpTime, DurableTxnStateEnum::kCommitted);
- assertTxnRecord(&session, 300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted);
- const auto fourthOpTime = writeTxnRecord(&session, 400, 3, thirdOpTime, boost::none);
- assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none);
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+
+ const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none);
+ assertTxnRecord(100, 0, firstOpTime, boost::none);
+ const auto secondOpTime = writeTxnRecord(200, 1, firstOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecord(200, 1, secondOpTime, DurableTxnStateEnum::kPrepared);
+ const auto thirdOpTime = writeTxnRecord(300, 2, secondOpTime, DurableTxnStateEnum::kCommitted);
+ assertTxnRecord(300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted);
+ const auto fourthOpTime = writeTxnRecord(400, 3, thirdOpTime, boost::none);
+ assertTxnRecord(400, 3, fourthOpTime, boost::none);
}
TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 20;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -360,10 +353,10 @@ TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) {
}
TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionNotDefaultCreated) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
// Drop the transactions table
BSONObj dropResult;
@@ -385,29 +378,27 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN
}
TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
- const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none);
+ const auto firstOpTime = writeTxnRecord(txnNum, 1000, {}, boost::none);
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000));
ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
- writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none);
+ writeTxnRecord(txnNum, 2000, firstOpTime, boost::none);
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000));
ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
// Invalidate the session and ensure the statements still check out
txnParticipant->invalidate();
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000));
@@ -417,10 +408,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) {
}
TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTransactionThrows) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -432,9 +421,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTr
TEST_F(TransactionParticipantRetryableWritesTest,
CheckStatementExecutedForInvalidatedTransactionThrows) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->invalidate();
ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), 100, 0),
@@ -444,11 +431,10 @@ TEST_F(TransactionParticipantRetryableWritesTest,
TEST_F(TransactionParticipantRetryableWritesTest,
WriteOpCompletedOnPrimaryForOldTransactionThrows) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -476,10 +462,8 @@ TEST_F(TransactionParticipantRetryableWritesTest,
TEST_F(TransactionParticipantRetryableWritesTest,
WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -487,7 +471,7 @@ TEST_F(TransactionParticipantRetryableWritesTest,
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto uuid = UUID::gen();
- const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
+ const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0);
txnParticipant->invalidate();
@@ -499,10 +483,8 @@ TEST_F(TransactionParticipantRetryableWritesTest,
TEST_F(TransactionParticipantRetryableWritesTest,
WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -511,7 +493,7 @@ TEST_F(TransactionParticipantRetryableWritesTest,
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto uuid = UUID::gen();
- const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
+ const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0);
txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
@@ -520,12 +502,12 @@ TEST_F(TransactionParticipantRetryableWritesTest,
wuow.commit();
}
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0));
}
TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) {
- const auto sessionId = makeLogicalSessionIdForTest();
+ const auto sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 2;
{
@@ -576,9 +558,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru
}());
}
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0),
AssertionException,
@@ -595,18 +576,17 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru
TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
const auto uuid = UUID::gen();
- const auto sessionId = makeLogicalSessionIdForTest();
+ const auto sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 2;
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
+
OperationSessionInfo osi;
osi.setSessionId(sessionId);
osi.setTxnNumber(txnNum);
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
- txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
-
auto firstOpTime = ([&]() {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
@@ -671,7 +651,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke
// Should have the same behavior after loading state from storage.
txnParticipant->invalidate();
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
{
auto oplog = txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1);