diff options
Diffstat (limited to 'src/mongo/db/session_test.cpp')
-rw-r--r-- | src/mongo/db/session_test.cpp | 135 |
1 files changed, 87 insertions, 48 deletions
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index e32c60f9d1b..1b8845011c3 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -182,6 +182,52 @@ protected: OplogSlot()); } + repl::OpTime writeTxnRecord(Session* session, + TxnNumber txnNum, + StmtId stmtId, + repl::OpTime prevOpTime, + boost::optional<DurableTxnStateEnum> txnState) { + session->beginOrContinueTxn(opCtx(), txnNum); + + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); + WriteUnitOfWork wuow(opCtx()); + const auto opTime = + logOp(opCtx(), kNss, session->getSessionId(), txnNum, stmtId, prevOpTime); + session->onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState); + wuow.commit(); + + return opTime; + } + + void assertTxnRecord(Session* session, + TxnNumber txnNum, + StmtId stmtId, + repl::OpTime opTime, + boost::optional<DurableTxnStateEnum> txnState) { + DBDirectClient client(opCtx()); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON("_id" << session->getSessionId().toBSON())}); + ASSERT(cursor); + ASSERT(cursor->more()); + + auto txnRecordObj = cursor->next(); + auto txnRecord = SessionTxnRecord::parse( + IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), txnRecordObj); + ASSERT(!cursor->more()); + ASSERT_EQ(session->getSessionId(), txnRecord.getSessionId()); + ASSERT_EQ(txnNum, txnRecord.getTxnNum()); + ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); + ASSERT(txnRecord.getState() == txnState); + ASSERT_EQ(txnState != boost::none, + txnRecordObj.hasField(SessionTxnRecord::kStateFieldName)); + ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum)); + + session->invalidate(); + session->refreshFromStorageIfNeeded(opCtx()); + ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum)); + } + OpObserverMock* _opObserver = nullptr; }; @@ -211,15 +257,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { const TxnNumber txnNum = 21; session.beginOrContinueTxn(opCtx(), txnNum); - const auto opTime = [&] { - AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); - WriteUnitOfWork wuow(opCtx()); - const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); - wuow.commit(); - - return opTime; - }(); + const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -233,6 +271,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { ASSERT_EQ(sessionId, txnRecord.getSessionId()); ASSERT_EQ(txnNum, txnRecord.getTxnNum()); ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); + ASSERT(!txnRecord.getState()); ASSERT_EQ(opTime, session.getLastWriteOpTime(txnNum)); } @@ -241,20 +280,8 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); - const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, repl::OpTime prevOpTime) { - session.beginOrContinueTxn(opCtx(), txnNum); - - AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); - WriteUnitOfWork wuow(opCtx()); - const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); - wuow.commit(); - - return opTime; - }; - - const auto firstOpTime = writeTxnRecordFn(100, 0, {}); - const auto secondOpTime = writeTxnRecordFn(200, 1, firstOpTime); + const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); + const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), @@ -268,6 +295,7 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { ASSERT_EQ(sessionId, txnRecord.getSessionId()); ASSERT_EQ(200, txnRecord.getTxnNum()); ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime()); + ASSERT(!txnRecord.getState()); ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); session.invalidate(); @@ -275,6 +303,23 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); } +TEST_F(SessionTest, TransactionTableUpdatesReplaceEntireDocument) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); + + const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); + assertTxnRecord(&session, 100, 0, firstOpTime, boost::none); + const auto secondOpTime = + writeTxnRecord(&session, 200, 1, firstOpTime, DurableTxnStateEnum::kPrepared); + assertTxnRecord(&session, 200, 1, secondOpTime, DurableTxnStateEnum::kPrepared); + const auto thirdOpTime = + writeTxnRecord(&session, 300, 2, secondOpTime, DurableTxnStateEnum::kCommitted); + assertTxnRecord(&session, 300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted); + const auto fourthOpTime = writeTxnRecord(&session, 400, 3, thirdOpTime, boost::none); + assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none); +} + TEST_F(SessionTest, StartingOldTxnShouldAssert) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); @@ -306,7 +351,8 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), + ASSERT_THROWS(session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none), AssertionException); } @@ -318,25 +364,15 @@ TEST_F(SessionTest, CheckStatementExecuted) { const TxnNumber txnNum = 100; session.beginOrContinueTxn(opCtx(), txnNum); - const auto writeTxnRecordFn = [&](StmtId stmtId, repl::OpTime prevOpTime) { - AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); - WriteUnitOfWork wuow(opCtx()); - const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); - wuow.commit(); - - return opTime; - }; - ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); - const auto firstOpTime = writeTxnRecordFn(1000, {}); + const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); - writeTxnRecordFn(2000, firstOpTime); + writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); @@ -386,7 +422,8 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); + session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); wuow.commit(); } @@ -394,10 +431,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0); - ASSERT_THROWS_CODE( - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime, Date_t::now()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } } @@ -415,10 +452,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { session.invalidate(); - ASSERT_THROWS_CODE( - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), - AssertionException, - ErrorCodes::ConflictingOperationInProgress); + ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { @@ -433,7 +470,8 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); + session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); session.invalidate(); @@ -543,7 +581,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { {}, false /* prepare */, OplogSlot()); - session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime, wallClockTime); + session.onWriteOpCompletedOnPrimary( + opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none); wuow.commit(); return opTime; @@ -573,7 +612,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { OplogSlot()); session.onWriteOpCompletedOnPrimary( - opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime); + opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime, boost::none); wuow.commit(); } |