summaryrefslogtreecommitdiff
path: root/src/mongo/db/session_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/session_test.cpp')
-rw-r--r--src/mongo/db/session_test.cpp135
1 files changed, 87 insertions, 48 deletions
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index e32c60f9d1b..1b8845011c3 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -182,6 +182,52 @@ protected:
OplogSlot());
}
+ repl::OpTime writeTxnRecord(Session* session,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ repl::OpTime prevOpTime,
+ boost::optional<DurableTxnStateEnum> txnState) {
+ session->beginOrContinueTxn(opCtx(), txnNum);
+
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ const auto opTime =
+ logOp(opCtx(), kNss, session->getSessionId(), txnNum, stmtId, prevOpTime);
+ session->onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {stmtId}, opTime, Date_t::now(), txnState);
+ wuow.commit();
+
+ return opTime;
+ }
+
+ void assertTxnRecord(Session* session,
+ TxnNumber txnNum,
+ StmtId stmtId,
+ repl::OpTime opTime,
+ boost::optional<DurableTxnStateEnum> txnState) {
+ DBDirectClient client(opCtx());
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON("_id" << session->getSessionId().toBSON())});
+ ASSERT(cursor);
+ ASSERT(cursor->more());
+
+ auto txnRecordObj = cursor->next();
+ auto txnRecord = SessionTxnRecord::parse(
+ IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), txnRecordObj);
+ ASSERT(!cursor->more());
+ ASSERT_EQ(session->getSessionId(), txnRecord.getSessionId());
+ ASSERT_EQ(txnNum, txnRecord.getTxnNum());
+ ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
+ ASSERT(txnRecord.getState() == txnState);
+ ASSERT_EQ(txnState != boost::none,
+ txnRecordObj.hasField(SessionTxnRecord::kStateFieldName));
+ ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum));
+
+ session->invalidate();
+ session->refreshFromStorageIfNeeded(opCtx());
+ ASSERT_EQ(opTime, session->getLastWriteOpTime(txnNum));
+ }
+
OpObserverMock* _opObserver = nullptr;
};
@@ -211,15 +257,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
const TxnNumber txnNum = 21;
session.beginOrContinueTxn(opCtx(), txnNum);
- const auto opTime = [&] {
- AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
- WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
- wuow.commit();
-
- return opTime;
- }();
+ const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -233,6 +271,7 @@ TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
ASSERT_EQ(sessionId, txnRecord.getSessionId());
ASSERT_EQ(txnNum, txnRecord.getTxnNum());
ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime());
+ ASSERT(!txnRecord.getState());
ASSERT_EQ(opTime, session.getLastWriteOpTime(txnNum));
}
@@ -241,20 +280,8 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
Session session(sessionId);
session.refreshFromStorageIfNeeded(opCtx());
- const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, repl::OpTime prevOpTime) {
- session.beginOrContinueTxn(opCtx(), txnNum);
-
- AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
- WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now());
- wuow.commit();
-
- return opTime;
- };
-
- const auto firstOpTime = writeTxnRecordFn(100, 0, {});
- const auto secondOpTime = writeTxnRecordFn(200, 1, firstOpTime);
+ const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
+ const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
@@ -268,6 +295,7 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
ASSERT_EQ(sessionId, txnRecord.getSessionId());
ASSERT_EQ(200, txnRecord.getTxnNum());
ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime());
+ ASSERT(!txnRecord.getState());
ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200));
session.invalidate();
@@ -275,6 +303,23 @@ TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200));
}
+TEST_F(SessionTest, TransactionTableUpdatesReplaceEntireDocument) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
+
+ const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
+ assertTxnRecord(&session, 100, 0, firstOpTime, boost::none);
+ const auto secondOpTime =
+ writeTxnRecord(&session, 200, 1, firstOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecord(&session, 200, 1, secondOpTime, DurableTxnStateEnum::kPrepared);
+ const auto thirdOpTime =
+ writeTxnRecord(&session, 300, 2, secondOpTime, DurableTxnStateEnum::kCommitted);
+ assertTxnRecord(&session, 300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted);
+ const auto fourthOpTime = writeTxnRecord(&session, 400, 3, thirdOpTime, boost::none);
+ assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none);
+}
+
TEST_F(SessionTest, StartingOldTxnShouldAssert) {
const auto sessionId = makeLogicalSessionIdForTest();
Session session(sessionId);
@@ -306,7 +351,8 @@ TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()),
+ ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none),
AssertionException);
}
@@ -318,25 +364,15 @@ TEST_F(SessionTest, CheckStatementExecuted) {
const TxnNumber txnNum = 100;
session.beginOrContinueTxn(opCtx(), txnNum);
- const auto writeTxnRecordFn = [&](StmtId stmtId, repl::OpTime prevOpTime) {
- AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
- WriteUnitOfWork wuow(opCtx());
- const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now());
- wuow.commit();
-
- return opTime;
- };
-
ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
- const auto firstOpTime = writeTxnRecordFn(1000, {});
+ const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none);
ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000));
ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
- writeTxnRecordFn(2000, firstOpTime);
+ writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none);
ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000));
ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
@@ -386,7 +422,8 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
+ session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
wuow.commit();
}
@@ -394,10 +431,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0);
- ASSERT_THROWS_CODE(
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime, Date_t::now()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum - 1, {0}, opTime, Date_t::now(), boost::none),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
}
@@ -415,10 +452,10 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
session.invalidate();
- ASSERT_THROWS_CODE(
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()),
- AssertionException,
- ErrorCodes::ConflictingOperationInProgress);
+ ASSERT_THROWS_CODE(session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
@@ -433,7 +470,8 @@ TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now());
+ session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
session.invalidate();
@@ -543,7 +581,8 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
{},
false /* prepare */,
OplogSlot());
- session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime, wallClockTime);
+ session.onWriteOpCompletedOnPrimary(
+ opCtx(), txnNum, {1}, opTime, wallClockTime, boost::none);
wuow.commit();
return opTime;
@@ -573,7 +612,7 @@ TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
OplogSlot());
session.onWriteOpCompletedOnPrimary(
- opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime);
+ opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime, boost::none);
wuow.commit();
}