diff options
author | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-09-11 10:47:44 -0400 |
---|---|---|
committer | Kaloian Manassiev <kaloian.manassiev@mongodb.com> | 2017-09-13 07:40:43 -0400 |
commit | 81111ce63ba47253c9bc3b44c84a4f452fff9f90 (patch) | |
tree | 6f9f7f88869a7de3b6013482d2f571c0b107e7a9 /src/mongo/db/session_test.cpp | |
parent | bf8ae1b3edbf7470e92b7407f76a042cc2246d48 (diff) | |
download | mongo-81111ce63ba47253c9bc3b44c84a4f452fff9f90.tar.gz |
SERVER-30325 Simplify session transaction state maintenance
This change exposes a single 'onWriteCompleted' method on the Session
object, which hides all the concurrency control and the session cache
maintenance.
Diffstat (limited to 'src/mongo/db/session_test.cpp')
-rw-r--r-- | src/mongo/db/session_test.cpp | 665 |
1 files changed, 233 insertions, 432 deletions
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp index f9864e8ad2f..3c3560c19da 100644 --- a/src/mongo/db/session_test.cpp +++ b/src/mongo/db/session_test.cpp @@ -28,11 +28,7 @@ #include "mongo/platform/basic.h" -#include <memory> - -#include "mongo/base/init.h" #include "mongo/db/client.h" -#include "mongo/db/curop.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context.h" @@ -40,19 +36,21 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" -#include "mongo/db/repl/replication_coordinator_mock.h" #include "mongo/db/service_context.h" -#include "mongo/db/service_context_d_test_fixture.h" #include "mongo/db/session_catalog.h" +#include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" namespace mongo { namespace { +const NamespaceString kNss("TestDB", "TestColl"); +const OptionalCollectionUUID kUUID; + class SessionTest : public MockReplCoordServerFixture { -public: - void setUp() override { +protected: + void setUp() final { MockReplCoordServerFixture::setUp(); auto service = opCtx()->getServiceContext(); @@ -60,528 +58,331 @@ public: SessionCatalog::create(service); SessionCatalog::get(service)->onStepUp(opCtx()); } -}; - -TEST_F(SessionTest, CanCreateNewSessionEntry) { - const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber txnNum = 20; - - Session txnState(sessionId); - txnState.begin(opCtx(), txnNum); - - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT_TRUE(txnState.getLastWriteOpTimeTs().isNull()); - - DBDirectClient client(opCtx()); - Query queryAll; - queryAll.sort(BSON("_id" << 1)); - - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll); - ASSERT_TRUE(cursor.get() != nullptr); - ASSERT_TRUE(cursor->more()); - auto doc = cursor->next(); - IDLParserErrorContext ctx1("CanCreateNewSessionEntry"); - auto txnRecord = SessionTxnRecord::parse(ctx1, doc); + SessionCatalog* catalog() { + return SessionCatalog::get(opCtx()->getServiceContext()); + } - ASSERT_EQ(sessionId, txnRecord.getSessionId()); - ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_TRUE(txnRecord.getLastWriteOpTimeTs().isNull()); + static repl::OpTime logOp(OperationContext* opCtx, + const NamespaceString& nss, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + StmtId stmtId) { + return logOp(opCtx, nss, lsid, txnNumber, stmtId, Timestamp()); + } - ASSERT_FALSE(cursor->more()); -} + static repl::OpTime logOp(OperationContext* opCtx, + const NamespaceString& nss, + const LogicalSessionId& lsid, + TxnNumber txnNumber, + StmtId stmtId, + Timestamp prevTs) { + OperationSessionInfo osi; + osi.setSessionId(lsid); + osi.setTxnNumber(txnNumber); + + repl::OplogLink link; + link.prevTs = prevTs; + + return repl::logOp( + opCtx, "n", nss, kUUID, BSON("TestValue" << 0), nullptr, false, osi, stmtId, link); + } +}; -TEST_F(SessionTest, StartingOldTxnShouldAssert) { +TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) { const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber txnNum = 20; - - Session txnState(sessionId); - txnState.begin(opCtx(), txnNum); - - ASSERT_THROWS(txnState.begin(opCtx(), txnNum - 1), AssertionException); - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT_TRUE(txnState.getLastWriteOpTimeTs().isNull()); -} + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); -TEST_F(SessionTest, StartingNewSessionWithCompatibleEntryInStorage) { - const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 20; - const Timestamp origTs(985, 15); + session.beginTxn(opCtx(), txnNum); - SessionTxnRecord origRecord; - origRecord.setSessionId(sessionId); - origRecord.setTxnNum(txnNum); - origRecord.setLastWriteOpTimeTs(origTs); + ASSERT_EQ(sessionId, session.getSessionId()); + ASSERT(session.getLastWriteOpTimeTs(txnNum).isNull()); DBDirectClient client(opCtx()); - client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), origRecord.toBSON()); - - Session txnState(sessionId); - txnState.begin(opCtx(), txnNum); - - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT_EQ(origTs, txnState.getLastWriteOpTimeTs()); - - // Confirm that nothing changed in storage. - - Query queryAll; - queryAll.sort(BSON("_id" << 1)); - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll); - ASSERT_TRUE(cursor.get() != nullptr); - - ASSERT_TRUE(cursor->more()); - auto doc = cursor->next(); - IDLParserErrorContext ctx1("StartingNewSessionWithCompatibleEntryInStorage"); - auto txnRecord = SessionTxnRecord::parse(ctx1, doc); - - ASSERT_EQ(sessionId, txnRecord.getSessionId()); - ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_EQ(origTs, txnRecord.getLastWriteOpTimeTs()); - - ASSERT_FALSE(cursor->more()); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON("_id" << sessionId.toBSON())}); + ASSERT(cursor); + ASSERT(!cursor->more()); } -TEST_F(SessionTest, StartingNewSessionWithOlderEntryInStorageShouldUpdateEntry) { +TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { const auto sessionId = makeLogicalSessionIdForTest(); - TxnNumber txnNum = 20; - const Timestamp origTs(985, 15); - - SessionTxnRecord origRecord; - origRecord.setSessionId(sessionId); - origRecord.setTxnNum(txnNum); - origRecord.setLastWriteOpTimeTs(origTs); - - DBDirectClient client(opCtx()); - client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), origRecord.toBSON()); - - Session txnState(sessionId); - txnState.begin(opCtx(), ++txnNum); - - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT_TRUE(txnState.getLastWriteOpTimeTs().isNull()); - - // Confirm that entry has new txn and ts reset to zero in storage. - - Query queryAll; - queryAll.sort(BSON("_id" << 1)); - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll); - ASSERT_TRUE(cursor.get() != nullptr); - - ASSERT_TRUE(cursor->more()); - auto doc = cursor->next(); - IDLParserErrorContext ctx1("StartingNewSessionWithOlderEntryInStorageShouldUpdateEntry"); - auto txnRecord = SessionTxnRecord::parse(ctx1, doc); - - ASSERT_EQ(sessionId, txnRecord.getSessionId()); - ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_TRUE(txnRecord.getLastWriteOpTimeTs().isNull()); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); - ASSERT_FALSE(cursor->more()); -} + const TxnNumber txnNum = 21; + session.beginTxn(opCtx(), txnNum); -TEST_F(SessionTest, StartingNewSessionWithNewerEntryInStorageShouldAssert) { - const auto sessionId = makeLogicalSessionIdForTest(); - TxnNumber txnNum = 20; - const Timestamp origTs(985, 15); + const auto opTime = [&] { + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); + WriteUnitOfWork wuow(opCtx()); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()); + wuow.commit(); - SessionTxnRecord origRecord; - origRecord.setSessionId(sessionId); - origRecord.setTxnNum(txnNum); - origRecord.setLastWriteOpTimeTs(origTs); + return opTime; + }(); DBDirectClient client(opCtx()); - client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), origRecord.toBSON()); - - Session txnState(sessionId); - ASSERT_THROWS(txnState.begin(opCtx(), txnNum - 1), AssertionException); - - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT_EQ(origTs, txnState.getLastWriteOpTimeTs()); - - // Confirm that nothing changed in storage. - - Query queryAll; - queryAll.sort(BSON("_id" << 1)); - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll); - ASSERT_TRUE(cursor.get() != nullptr); - - ASSERT_TRUE(cursor->more()); - auto doc = cursor->next(); - IDLParserErrorContext ctx1("StartingNewSessionWithOlderEntryInStorageShouldUpdateEntry"); - auto txnRecord = SessionTxnRecord::parse(ctx1, doc); - + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON("_id" << sessionId.toBSON())}); + ASSERT(cursor); + ASSERT(cursor->more()); + + auto txnRecord = SessionTxnRecord::parse( + IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), cursor->next()); + ASSERT(!cursor->more()); ASSERT_EQ(sessionId, txnRecord.getSessionId()); ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_EQ(origTs, txnRecord.getLastWriteOpTimeTs()); - - ASSERT_FALSE(cursor->more()); + ASSERT_EQ(opTime.getTimestamp(), txnRecord.getLastWriteOpTimeTs()); + ASSERT_EQ(opTime.getTimestamp(), session.getLastWriteOpTimeTs(txnNum)); } -TEST_F(SessionTest, StoreOpTime) { +TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber txnNum = 20; - const Timestamp ts1(100, 42); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); - Session txnState(sessionId); - txnState.begin(opCtx(), txnNum); + const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, Timestamp prevTs) { + session.beginTxn(opCtx(), txnNum); - { - AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX); + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); - - txnState.saveTxnProgress(opCtx(), ts1); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevTs); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime.getTimestamp()); wuow.commit(); - } - DBDirectClient client(opCtx()); - Query queryAll; - queryAll.sort(BSON("_id" << 1)); + return opTime.getTimestamp(); + }; - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll); - ASSERT_TRUE(cursor.get() != nullptr); - - ASSERT_TRUE(cursor->more()); - auto doc = cursor->next(); - IDLParserErrorContext ctx1("StoreOpTime 1"); - auto txnRecord = SessionTxnRecord::parse(ctx1, doc); + const auto firstTs = writeTxnRecordFn(100, 0, Timestamp()); + const auto secondTs = writeTxnRecordFn(200, 1, firstTs); + DBDirectClient client(opCtx()); + auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), + {BSON("_id" << sessionId.toBSON())}); + ASSERT(cursor); + ASSERT(cursor->more()); + + auto txnRecord = SessionTxnRecord::parse( + IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), cursor->next()); + ASSERT(!cursor->more()); ASSERT_EQ(sessionId, txnRecord.getSessionId()); - ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_EQ(ts1, txnRecord.getLastWriteOpTimeTs()); - - ASSERT_FALSE(cursor->more()); + ASSERT_EQ(200, txnRecord.getTxnNum()); + ASSERT_EQ(secondTs, txnRecord.getLastWriteOpTimeTs()); + ASSERT_EQ(secondTs, session.getLastWriteOpTimeTs(200)); - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT_EQ(ts1, txnState.getLastWriteOpTimeTs()); + session.invalidate(); + session.refreshFromStorageIfNeeded(opCtx()); + ASSERT_EQ(secondTs, session.getLastWriteOpTimeTs(200)); +} - const Timestamp ts2(200, 23); - { - AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX); - WriteUnitOfWork wuow(opCtx()); +TEST_F(SessionTest, StartingOldTxnShouldAssert) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); - txnState.saveTxnProgress(opCtx(), ts2); - wuow.commit(); - } + const TxnNumber txnNum = 20; + session.beginTxn(opCtx(), txnNum); - cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll); - ASSERT_TRUE(cursor.get() != nullptr); + ASSERT_THROWS_CODE( + session.beginTxn(opCtx(), txnNum - 1), AssertionException, ErrorCodes::TransactionTooOld); + ASSERT(session.getLastWriteOpTimeTs(txnNum).isNull()); +} - ASSERT_TRUE(cursor->more()); - doc = cursor->next(); - IDLParserErrorContext ctx2("StoreOpTime 2"); - txnRecord = SessionTxnRecord::parse(ctx2, doc); +TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); - ASSERT_EQ(sessionId, txnRecord.getSessionId()); - ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_EQ(ts2, txnRecord.getLastWriteOpTimeTs()); + // Drop the transactions table + BSONObj dropResult; + DBDirectClient client(opCtx()); + const auto& nss = NamespaceString::kSessionTransactionsTableNamespace; + ASSERT(client.runCommand(nss.db().toString(), BSON("drop" << nss.coll()), dropResult)); - ASSERT_FALSE(cursor->more()); + const TxnNumber txnNum = 21; + session.beginTxn(opCtx(), txnNum); - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT_EQ(ts2, txnState.getLastWriteOpTimeTs()); + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); + WriteUnitOfWork wuow(opCtx()); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); + ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()), + AssertionException); } -TEST_F(SessionTest, CanBumpTransactionIdIfNewer) { +TEST_F(SessionTest, CheckStatementExecuted) { const auto sessionId = makeLogicalSessionIdForTest(); - TxnNumber txnNum = 20; - const Timestamp ts1(100, 42); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); - Session txnState(sessionId); - txnState.begin(opCtx(), txnNum); + const TxnNumber txnNum = 100; + session.beginTxn(opCtx(), txnNum); - { - AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX); + const auto writeTxnRecordFn = [&](StmtId stmtId, Timestamp prevTs) { + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); - - txnState.saveTxnProgress(opCtx(), ts1); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevTs); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime.getTimestamp()); wuow.commit(); - } - - DBDirectClient client(opCtx()); - Query queryAll; - queryAll.sort(BSON("_id" << 1)); - - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll); - ASSERT_TRUE(cursor.get() != nullptr); - - ASSERT_TRUE(cursor->more()); - auto doc = cursor->next(); - IDLParserErrorContext ctx1("CanBumpTransactionIdIfNewer 1"); - auto txnRecord = SessionTxnRecord::parse(ctx1, doc); - - ASSERT_EQ(sessionId, txnRecord.getSessionId()); - ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_EQ(ts1, txnRecord.getLastWriteOpTimeTs()); - - ASSERT_FALSE(cursor->more()); - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT_EQ(ts1, txnState.getLastWriteOpTimeTs()); + return opTime.getTimestamp(); + }; - // Start a new transaction on the same session. - txnState.begin(opCtx(), ++txnNum); + ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000)); + const auto firstTs = writeTxnRecordFn(1000, Timestamp()); + ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000)); - cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll); - ASSERT_TRUE(cursor.get() != nullptr); + ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000)); + writeTxnRecordFn(2000, firstTs); + ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000)); - ASSERT_TRUE(cursor->more()); - doc = cursor->next(); - IDLParserErrorContext ctx2("CanBumpTransactionIdIfNewer 2"); - txnRecord = SessionTxnRecord::parse(ctx2, doc); + // Invalidate the session and ensure the statements still check out + session.invalidate(); + session.refreshFromStorageIfNeeded(opCtx()); - ASSERT_EQ(sessionId, txnRecord.getSessionId()); - ASSERT_EQ(txnNum, txnRecord.getTxnNum()); - ASSERT_TRUE(txnRecord.getLastWriteOpTimeTs().isNull()); - - ASSERT_FALSE(cursor->more()); - - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT_TRUE(txnState.getLastWriteOpTimeTs().isNull()); + ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000)); + ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000)); } -TEST_F(SessionTest, StartingNewSessionWithDroppedTableShouldAssert) { +TEST_F(SessionTest, CheckStatementExecutedForOldTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber txnNum = 20; - - const auto& ns = NamespaceString::kSessionTransactionsTableNamespace; - - BSONObj dropResult; - DBDirectClient client(opCtx()); - ASSERT_TRUE(client.runCommand(ns.db().toString(), BSON("drop" << ns.coll()), dropResult)); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); - Session txnState(sessionId); - ASSERT_THROWS(txnState.begin(opCtx(), txnNum), AssertionException); + const TxnNumber txnNum = 100; + session.beginTxn(opCtx(), txnNum); - ASSERT_EQ(sessionId, txnState.getSessionId()); + ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum - 1, 0), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } -TEST_F(SessionTest, SaveTxnProgressShouldAssertIfTableIsDropped) { +TEST_F(SessionTest, CheckStatementExecutedForInvalidatedTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber txnNum = 20; - const Timestamp ts1(100, 42); - - Session txnState(sessionId); - txnState.begin(opCtx(), txnNum); - - const auto& ns = NamespaceString::kSessionTransactionsTableNamespace; - - BSONObj dropResult; - DBDirectClient client(opCtx()); - ASSERT_TRUE(client.runCommand(ns.db().toString(), BSON("drop" << ns.coll()), dropResult)); - - AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX); - WriteUnitOfWork wuow(opCtx()); + Session session(sessionId); + session.invalidate(); - ASSERT_THROWS(txnState.saveTxnProgress(opCtx(), ts1), AssertionException); + ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), 100, 0), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } -TEST_F(SessionTest, TwoSessionsShouldBeIndependent) { - const auto sessionId1 = makeLogicalSessionIdForTest(); - const TxnNumber txnNum1 = 20; - const Timestamp ts1(1903, 42); - - Session txnState1(sessionId1); - txnState1.begin(opCtx(), txnNum1); - - const auto sessionId2 = makeLogicalSessionIdForTest(); - const TxnNumber txnNum2 = 300; - const Timestamp ts2(671, 5); +TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { + const auto sessionId = makeLogicalSessionIdForTest(); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); - Session txnState2(sessionId2); - txnState2.begin(opCtx(), txnNum2); + const TxnNumber txnNum = 100; + session.beginTxn(opCtx(), txnNum); { - AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX); + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); - - txnState2.saveTxnProgress(opCtx(), ts2); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()); wuow.commit(); } { - AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX); + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); - - txnState1.saveTxnProgress(opCtx(), ts1); - wuow.commit(); - } - - DBDirectClient client(opCtx()); - Query queryAll; - queryAll.sort(BSON(SessionTxnRecord::kTxnNumFieldName << 1)); - - auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll); - ASSERT_TRUE(cursor.get() != nullptr); - - ASSERT_TRUE(cursor->more()); - - { - auto doc = cursor->next(); - IDLParserErrorContext ctx("TwoSessionsShouldBeIndependent 1"); - auto txnRecord = SessionTxnRecord::parse(ctx, doc); - - ASSERT_EQ(sessionId1, txnRecord.getSessionId()); - ASSERT_EQ(txnNum1, txnRecord.getTxnNum()); - ASSERT_EQ(ts1, txnRecord.getLastWriteOpTimeTs()); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0); + ASSERT_THROWS_CODE( + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime.getTimestamp()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } - - ASSERT_TRUE(cursor->more()); - - { - auto doc = cursor->next(); - IDLParserErrorContext ctx("TwoSessionsShouldBeIndependent 2"); - auto txnRecord = SessionTxnRecord::parse(ctx, doc); - - ASSERT_EQ(sessionId2, txnRecord.getSessionId()); - ASSERT_EQ(txnNum2, txnRecord.getTxnNum()); - ASSERT_EQ(ts2, txnRecord.getLastWriteOpTimeTs()); - } - - ASSERT_FALSE(cursor->more()); } -TEST_F(SessionTest, CheckStatementExecuted) { +TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber txnNum = 20; - const StmtId stmtId = 5; - - opCtx()->setLogicalSessionId(sessionId); - opCtx()->setTxnNumber(txnNum); - Session session(sessionId); - session.begin(opCtx(), txnNum); - - // Returns nothing if the statement has not been executed. - auto fetchedEntry = session.checkStatementExecuted(opCtx(), stmtId); - ASSERT_FALSE(fetchedEntry); - - // Returns the correct oplog entry if the statement has completed. - auto optimeTs = Timestamp(50, 10); - - OperationSessionInfo opSessionInfo; - opSessionInfo.setSessionId(sessionId); - opSessionInfo.setTxnNumber(txnNum); - - repl::OplogEntry oplogEntry(repl::OpTime(optimeTs, 1), - 0, - repl::OpTypeEnum::kInsert, - NamespaceString("a.b"), - 0, - BSON("_id" << 1 << "x" << 5)); - oplogEntry.setOperationSessionInfo(opSessionInfo); - oplogEntry.setStatementId(stmtId); - oplogEntry.setPrevWriteTsInTransaction(Timestamp(0, 0)); - insertOplogEntry(oplogEntry); - - { - AutoGetCollection autoColl(opCtx(), NamespaceString("a.b"), MODE_IX); - WriteUnitOfWork wuow(opCtx()); + session.refreshFromStorageIfNeeded(opCtx()); - session.saveTxnProgress(opCtx(), optimeTs); - wuow.commit(); - } + const TxnNumber txnNum = 100; + session.beginTxn(opCtx(), txnNum); - fetchedEntry = session.checkStatementExecuted(opCtx(), stmtId); - ASSERT_TRUE(fetchedEntry); - ASSERT_EQ(fetchedEntry->getStatementId().get(), stmtId); - - // Still returns nothing for uncompleted statements. - auto uncompletedStmtId = 10; - fetchedEntry = session.checkStatementExecuted(opCtx(), uncompletedStmtId); - ASSERT_FALSE(fetchedEntry); -} - -TEST_F(SessionTest, BeginReloadsStateAfterReset) { - const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber txnNum = 20; - - Session txnState(sessionId); - txnState.begin(opCtx(), txnNum); - - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT(txnState.getLastWriteOpTimeTs().isNull()); - - const TxnNumber newTxnNum = 50; - const auto newTs = Timestamp(1, 1); - Session::updateSessionRecord(opCtx(), sessionId, newTxnNum, newTs); - txnState.reset(); + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); + WriteUnitOfWork wuow(opCtx()); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); - txnState.begin(opCtx(), newTxnNum); + session.invalidate(); - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(newTxnNum, txnState.getTxnNum()); - ASSERT_EQ(txnState.getLastWriteOpTimeTs(), newTs); + ASSERT_THROWS_CODE( + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } -TEST_F(SessionTest, BeginDoesNotReloadWithoutReset) { +// TODO: These tests require the storage engine used for testing to support document-level +// concurrency control +#if (0) +TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresOldTransaction) { const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber txnNum = 20; - - Session txnState(sessionId); - txnState.begin(opCtx(), txnNum); - - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT(txnState.getLastWriteOpTimeTs().isNull()); - - const TxnNumber newTxnNum = 30; - const auto newTs = Timestamp(1, 1); - Session::updateSessionRecord(opCtx(), sessionId, newTxnNum, newTs); + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); - txnState.begin(opCtx(), txnNum); + const TxnNumber txnNum = 100; + session.beginTxn(opCtx(), txnNum); - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT(txnState.getLastWriteOpTimeTs().isNull()); -} + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); + WriteUnitOfWork wuow(opCtx()); -TEST_F(SessionTest, StartingOldTxnFailsAfterReset) { - const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber oldTxnNum = 20; + { + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()); + } - Session txnState(sessionId); - txnState.begin(opCtx(), oldTxnNum); + // Mimics a different thread starting a newer transaction on the same session + const TxnNumber newTxnNum = 200; + + stdx::async(stdx::launch::async, [&sessionId, &session, newTxnNum] { + Client::initThreadIfNotAlready(); + auto sideOpCtx = Client::getCurrent()->makeOperationContext(); + AutoGetCollection autoColl(sideOpCtx.get(), kNss, MODE_IX); + WriteUnitOfWork wuow(sideOpCtx.get()); + const auto opTime = logOp(sideOpCtx.get(), kNss, sessionId, newTxnNum, 1000); + session.onWriteOpCompletedOnPrimary( + sideOpCtx.get(), newTxnNum, {1000}, opTime.getTimestamp()); + wuow.commit(); + }).get(); - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(oldTxnNum, txnState.getTxnNum()); - ASSERT(txnState.getLastWriteOpTimeTs().isNull()); + wuow.commit(); - const TxnNumber newTxnNum = 30; - Session::updateSessionRecord(opCtx(), sessionId, newTxnNum, Timestamp()); - txnState.reset(); + // The newer transaction must win + ASSERT(!session.checkStatementExecuted(opCtx(), newTxnNum, 0)); + ASSERT(session.checkStatementExecuted(opCtx(), newTxnNum, 1000)); - ASSERT_THROWS(txnState.begin(opCtx(), oldTxnNum), AssertionException); + ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum, 0), + AssertionException, + ErrorCodes::ConflictingOperationInProgress); } +#endif -TEST_F(SessionTest, CanStartLaterTxnAfterReset) { +TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { const auto sessionId = makeLogicalSessionIdForTest(); - const TxnNumber txnNum = 20; + Session session(sessionId); + session.refreshFromStorageIfNeeded(opCtx()); - Session txnState(sessionId); - txnState.begin(opCtx(), txnNum); + const TxnNumber txnNum = 100; + session.beginTxn(opCtx(), txnNum); - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(txnNum, txnState.getTxnNum()); - ASSERT(txnState.getLastWriteOpTimeTs().isNull()); + { + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); + WriteUnitOfWork wuow(opCtx()); + const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); + session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()); - txnState.reset(); + session.invalidate(); - const TxnNumber newTxnNum = 40; - txnState.begin(opCtx(), newTxnNum); + wuow.commit(); + } - ASSERT_EQ(sessionId, txnState.getSessionId()); - ASSERT_EQ(newTxnNum, txnState.getTxnNum()); - ASSERT(txnState.getLastWriteOpTimeTs().isNull()); + session.refreshFromStorageIfNeeded(opCtx()); + ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 0)); } } // namespace |