diff options
Diffstat (limited to 'src/mongo/db/transaction_participant_retryable_writes_test.cpp')
-rw-r--r-- | src/mongo/db/transaction_participant_retryable_writes_test.cpp | 170 |
1 files changed, 75 insertions, 95 deletions
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 5914862a591..f3900a43b5f 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -145,15 +145,16 @@ protected: MongoDSessionCatalog::onStepUp(opCtx()); const auto service = opCtx()->getServiceContext(); - OpObserverRegistry* opObserverRegistry = - dynamic_cast<OpObserverRegistry*>(service->getOpObserver()); - auto mockObserver = stdx::make_unique<OpObserverMock>(); - _opObserver = mockObserver.get(); - opObserverRegistry->addObserver(std::move(mockObserver)); + + const auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver()); + opObserverRegistry->addObserver(stdx::make_unique<OpObserverMock>()); + + opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); + _opContextSession.emplace(opCtx()); } void tearDown() final { - _opObserver = nullptr; + _opContextSession.reset(); MockReplCoordServerFixture::tearDown(); } @@ -196,16 +197,16 @@ protected: OplogSlot()); } - repl::OpTime writeTxnRecord(Session* session, - TxnNumber txnNum, + repl::OpTime writeTxnRecord(TxnNumber txnNum, StmtId stmtId, repl::OpTime prevOpTime, boost::optional<DurableTxnStateEnum> txnState) { - const auto uuid = UUID::gen(); - + const auto session = OperationContextSession::get(opCtx()); const auto txnParticipant = TransactionParticipant::get(session); txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); + const auto uuid = UUID::gen(); + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = @@ -217,11 +218,12 @@ protected: return opTime; } - void assertTxnRecord(Session* session, - TxnNumber txnNum, + void assertTxnRecord(TxnNumber txnNum, StmtId stmtId, repl::OpTime opTime, boost::optional<DurableTxnStateEnum> txnState) { + const auto session = OperationContextSession::get(opCtx()); + DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, {BSON("_id" << session->getSessionId().toBSON())}); @@ -243,23 +245,21 @@ protected: ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); txnParticipant->invalidate(); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); } - OpObserverMock* _opObserver = nullptr; +private: + boost::optional<OperationContextSession> _opContextSession; }; TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto& sessionId = *opCtx()->getLogicalSessionId(); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 20; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - - ASSERT_EQ(sessionId, session.getSessionId()); ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull()); DBDirectClient client(opCtx()); @@ -270,15 +270,14 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) } TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrite) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 21; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none); + const auto opTime = writeTxnRecord(txnNum, 0, {}, boost::none); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, @@ -298,13 +297,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit TEST_F(TransactionParticipantRetryableWritesTest, StartingNewerTransactionUpdatesThePersistedSession) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); - const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); - const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none); + const auto& sessionId = *opCtx()->getLogicalSessionId(); + + const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none); + const auto secondOpTime = writeTxnRecord(200, 1, firstOpTime, boost::none); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, @@ -322,33 +321,27 @@ TEST_F(TransactionParticipantRetryableWritesTest, ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200)); txnParticipant->invalidate(); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200)); } TEST_F(TransactionParticipantRetryableWritesTest, TransactionTableUpdatesReplaceEntireDocument) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); - - const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); - assertTxnRecord(&session, 100, 0, firstOpTime, boost::none); - const auto secondOpTime = - writeTxnRecord(&session, 200, 1, firstOpTime, DurableTxnStateEnum::kPrepared); - assertTxnRecord(&session, 200, 1, secondOpTime, DurableTxnStateEnum::kPrepared); - const auto thirdOpTime = - writeTxnRecord(&session, 300, 2, secondOpTime, DurableTxnStateEnum::kCommitted); - assertTxnRecord(&session, 300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted); - const auto fourthOpTime = writeTxnRecord(&session, 400, 3, thirdOpTime, boost::none); - assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + + const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none); + assertTxnRecord(100, 0, firstOpTime, boost::none); + const auto secondOpTime = writeTxnRecord(200, 1, firstOpTime, DurableTxnStateEnum::kPrepared); + assertTxnRecord(200, 1, secondOpTime, DurableTxnStateEnum::kPrepared); + const auto thirdOpTime = writeTxnRecord(300, 2, secondOpTime, DurableTxnStateEnum::kCommitted); + assertTxnRecord(300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted); + const auto fourthOpTime = writeTxnRecord(400, 3, thirdOpTime, boost::none); + assertTxnRecord(400, 3, fourthOpTime, boost::none); } TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 20; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -360,10 +353,10 @@ TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) { } TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionNotDefaultCreated) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + + const auto& sessionId = *opCtx()->getLogicalSessionId(); // Drop the transactions table BSONObj dropResult; @@ -385,29 +378,27 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN } TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); - const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none); + const auto firstOpTime = writeTxnRecord(txnNum, 1000, {}, boost::none); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); - writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none); + writeTxnRecord(txnNum, 2000, firstOpTime, boost::none); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); // Invalidate the session and ensure the statements still check out txnParticipant->invalidate(); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000)); @@ -417,10 +408,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) { } TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTransactionThrows) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -432,9 +421,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTr TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForInvalidatedTransactionThrows) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); + const auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->invalidate(); ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), 100, 0), @@ -444,11 +431,10 @@ TEST_F(TransactionParticipantRetryableWritesTest, TEST_F(TransactionParticipantRetryableWritesTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -476,10 +462,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, TEST_F(TransactionParticipantRetryableWritesTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -487,7 +471,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto uuid = UUID::gen(); - const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); + const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0); txnParticipant->invalidate(); @@ -499,10 +483,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, TEST_F(TransactionParticipantRetryableWritesTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -511,7 +493,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto uuid = UUID::gen(); - const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); + const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0); txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); @@ -520,12 +502,12 @@ TEST_F(TransactionParticipantRetryableWritesTest, wuow.commit(); } - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0)); } TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) { - const auto sessionId = makeLogicalSessionIdForTest(); + const auto sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 2; { @@ -576,9 +558,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru }()); } - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0), AssertionException, @@ -595,18 +576,17 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { const auto uuid = UUID::gen(); - const auto sessionId = makeLogicalSessionIdForTest(); + const auto sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 2; + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); + OperationSessionInfo osi; osi.setSessionId(sessionId); osi.setTxnNumber(txnNum); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); - txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - auto firstOpTime = ([&]() { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); @@ -671,7 +651,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke // Should have the same behavior after loading state from storage. txnParticipant->invalidate(); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); { auto oplog = txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1); |