summaryrefslogtreecommitdiff
path: root/src/mongo/db/session_test.cpp
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-09-11 10:47:44 -0400
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2017-09-13 07:40:43 -0400
commit81111ce63ba47253c9bc3b44c84a4f452fff9f90 (patch)
tree6f9f7f88869a7de3b6013482d2f571c0b107e7a9 /src/mongo/db/session_test.cpp
parentbf8ae1b3edbf7470e92b7407f76a042cc2246d48 (diff)
downloadmongo-81111ce63ba47253c9bc3b44c84a4f452fff9f90.tar.gz
SERVER-30325 Simplify session transaction state maintenance
This change exposes a single 'onWriteCompleted' method on the Session object, which hides all the concurrency control and the session cache maintenance.
Diffstat (limited to 'src/mongo/db/session_test.cpp')
-rw-r--r--src/mongo/db/session_test.cpp665
1 files changed, 233 insertions, 432 deletions
diff --git a/src/mongo/db/session_test.cpp b/src/mongo/db/session_test.cpp
index f9864e8ad2f..3c3560c19da 100644
--- a/src/mongo/db/session_test.cpp
+++ b/src/mongo/db/session_test.cpp
@@ -28,11 +28,7 @@
#include "mongo/platform/basic.h"
-#include <memory>
-
-#include "mongo/base/init.h"
#include "mongo/db/client.h"
-#include "mongo/db/curop.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/operation_context.h"
@@ -40,19 +36,21 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
-#include "mongo/db/repl/replication_coordinator_mock.h"
#include "mongo/db/service_context.h"
-#include "mongo/db/service_context_d_test_fixture.h"
#include "mongo/db/session_catalog.h"
+#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/unittest.h"
namespace mongo {
namespace {
+const NamespaceString kNss("TestDB", "TestColl");
+const OptionalCollectionUUID kUUID;
+
class SessionTest : public MockReplCoordServerFixture {
-public:
- void setUp() override {
+protected:
+ void setUp() final {
MockReplCoordServerFixture::setUp();
auto service = opCtx()->getServiceContext();
@@ -60,528 +58,331 @@ public:
SessionCatalog::create(service);
SessionCatalog::get(service)->onStepUp(opCtx());
}
-};
-
-TEST_F(SessionTest, CanCreateNewSessionEntry) {
- const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
-
- Session txnState(sessionId);
- txnState.begin(opCtx(), txnNum);
-
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT_TRUE(txnState.getLastWriteOpTimeTs().isNull());
-
- DBDirectClient client(opCtx());
- Query queryAll;
- queryAll.sort(BSON("_id" << 1));
-
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll);
- ASSERT_TRUE(cursor.get() != nullptr);
- ASSERT_TRUE(cursor->more());
- auto doc = cursor->next();
- IDLParserErrorContext ctx1("CanCreateNewSessionEntry");
- auto txnRecord = SessionTxnRecord::parse(ctx1, doc);
+ SessionCatalog* catalog() {
+ return SessionCatalog::get(opCtx()->getServiceContext());
+ }
- ASSERT_EQ(sessionId, txnRecord.getSessionId());
- ASSERT_EQ(txnNum, txnRecord.getTxnNum());
- ASSERT_TRUE(txnRecord.getLastWriteOpTimeTs().isNull());
+ static repl::OpTime logOp(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ StmtId stmtId) {
+ return logOp(opCtx, nss, lsid, txnNumber, stmtId, Timestamp());
+ }
- ASSERT_FALSE(cursor->more());
-}
+ static repl::OpTime logOp(OperationContext* opCtx,
+ const NamespaceString& nss,
+ const LogicalSessionId& lsid,
+ TxnNumber txnNumber,
+ StmtId stmtId,
+ Timestamp prevTs) {
+ OperationSessionInfo osi;
+ osi.setSessionId(lsid);
+ osi.setTxnNumber(txnNumber);
+
+ repl::OplogLink link;
+ link.prevTs = prevTs;
+
+ return repl::logOp(
+ opCtx, "n", nss, kUUID, BSON("TestValue" << 0), nullptr, false, osi, stmtId, link);
+ }
+};
-TEST_F(SessionTest, StartingOldTxnShouldAssert) {
+TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) {
const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
-
- Session txnState(sessionId);
- txnState.begin(opCtx(), txnNum);
-
- ASSERT_THROWS(txnState.begin(opCtx(), txnNum - 1), AssertionException);
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT_TRUE(txnState.getLastWriteOpTimeTs().isNull());
-}
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
-TEST_F(SessionTest, StartingNewSessionWithCompatibleEntryInStorage) {
- const auto sessionId = makeLogicalSessionIdForTest();
const TxnNumber txnNum = 20;
- const Timestamp origTs(985, 15);
+ session.beginTxn(opCtx(), txnNum);
- SessionTxnRecord origRecord;
- origRecord.setSessionId(sessionId);
- origRecord.setTxnNum(txnNum);
- origRecord.setLastWriteOpTimeTs(origTs);
+ ASSERT_EQ(sessionId, session.getSessionId());
+ ASSERT(session.getLastWriteOpTimeTs(txnNum).isNull());
DBDirectClient client(opCtx());
- client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), origRecord.toBSON());
-
- Session txnState(sessionId);
- txnState.begin(opCtx(), txnNum);
-
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT_EQ(origTs, txnState.getLastWriteOpTimeTs());
-
- // Confirm that nothing changed in storage.
-
- Query queryAll;
- queryAll.sort(BSON("_id" << 1));
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll);
- ASSERT_TRUE(cursor.get() != nullptr);
-
- ASSERT_TRUE(cursor->more());
- auto doc = cursor->next();
- IDLParserErrorContext ctx1("StartingNewSessionWithCompatibleEntryInStorage");
- auto txnRecord = SessionTxnRecord::parse(ctx1, doc);
-
- ASSERT_EQ(sessionId, txnRecord.getSessionId());
- ASSERT_EQ(txnNum, txnRecord.getTxnNum());
- ASSERT_EQ(origTs, txnRecord.getLastWriteOpTimeTs());
-
- ASSERT_FALSE(cursor->more());
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON("_id" << sessionId.toBSON())});
+ ASSERT(cursor);
+ ASSERT(!cursor->more());
}
-TEST_F(SessionTest, StartingNewSessionWithOlderEntryInStorageShouldUpdateEntry) {
+TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) {
const auto sessionId = makeLogicalSessionIdForTest();
- TxnNumber txnNum = 20;
- const Timestamp origTs(985, 15);
-
- SessionTxnRecord origRecord;
- origRecord.setSessionId(sessionId);
- origRecord.setTxnNum(txnNum);
- origRecord.setLastWriteOpTimeTs(origTs);
-
- DBDirectClient client(opCtx());
- client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), origRecord.toBSON());
-
- Session txnState(sessionId);
- txnState.begin(opCtx(), ++txnNum);
-
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT_TRUE(txnState.getLastWriteOpTimeTs().isNull());
-
- // Confirm that entry has new txn and ts reset to zero in storage.
-
- Query queryAll;
- queryAll.sort(BSON("_id" << 1));
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll);
- ASSERT_TRUE(cursor.get() != nullptr);
-
- ASSERT_TRUE(cursor->more());
- auto doc = cursor->next();
- IDLParserErrorContext ctx1("StartingNewSessionWithOlderEntryInStorageShouldUpdateEntry");
- auto txnRecord = SessionTxnRecord::parse(ctx1, doc);
-
- ASSERT_EQ(sessionId, txnRecord.getSessionId());
- ASSERT_EQ(txnNum, txnRecord.getTxnNum());
- ASSERT_TRUE(txnRecord.getLastWriteOpTimeTs().isNull());
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
- ASSERT_FALSE(cursor->more());
-}
+ const TxnNumber txnNum = 21;
+ session.beginTxn(opCtx(), txnNum);
-TEST_F(SessionTest, StartingNewSessionWithNewerEntryInStorageShouldAssert) {
- const auto sessionId = makeLogicalSessionIdForTest();
- TxnNumber txnNum = 20;
- const Timestamp origTs(985, 15);
+ const auto opTime = [&] {
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp());
+ wuow.commit();
- SessionTxnRecord origRecord;
- origRecord.setSessionId(sessionId);
- origRecord.setTxnNum(txnNum);
- origRecord.setLastWriteOpTimeTs(origTs);
+ return opTime;
+ }();
DBDirectClient client(opCtx());
- client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), origRecord.toBSON());
-
- Session txnState(sessionId);
- ASSERT_THROWS(txnState.begin(opCtx(), txnNum - 1), AssertionException);
-
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT_EQ(origTs, txnState.getLastWriteOpTimeTs());
-
- // Confirm that nothing changed in storage.
-
- Query queryAll;
- queryAll.sort(BSON("_id" << 1));
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll);
- ASSERT_TRUE(cursor.get() != nullptr);
-
- ASSERT_TRUE(cursor->more());
- auto doc = cursor->next();
- IDLParserErrorContext ctx1("StartingNewSessionWithOlderEntryInStorageShouldUpdateEntry");
- auto txnRecord = SessionTxnRecord::parse(ctx1, doc);
-
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON("_id" << sessionId.toBSON())});
+ ASSERT(cursor);
+ ASSERT(cursor->more());
+
+ auto txnRecord = SessionTxnRecord::parse(
+ IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), cursor->next());
+ ASSERT(!cursor->more());
ASSERT_EQ(sessionId, txnRecord.getSessionId());
ASSERT_EQ(txnNum, txnRecord.getTxnNum());
- ASSERT_EQ(origTs, txnRecord.getLastWriteOpTimeTs());
-
- ASSERT_FALSE(cursor->more());
+ ASSERT_EQ(opTime.getTimestamp(), txnRecord.getLastWriteOpTimeTs());
+ ASSERT_EQ(opTime.getTimestamp(), session.getLastWriteOpTimeTs(txnNum));
}
-TEST_F(SessionTest, StoreOpTime) {
+TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) {
const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
- const Timestamp ts1(100, 42);
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
- Session txnState(sessionId);
- txnState.begin(opCtx(), txnNum);
+ const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, Timestamp prevTs) {
+ session.beginTxn(opCtx(), txnNum);
- {
- AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX);
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
-
- txnState.saveTxnProgress(opCtx(), ts1);
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevTs);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime.getTimestamp());
wuow.commit();
- }
- DBDirectClient client(opCtx());
- Query queryAll;
- queryAll.sort(BSON("_id" << 1));
+ return opTime.getTimestamp();
+ };
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll);
- ASSERT_TRUE(cursor.get() != nullptr);
-
- ASSERT_TRUE(cursor->more());
- auto doc = cursor->next();
- IDLParserErrorContext ctx1("StoreOpTime 1");
- auto txnRecord = SessionTxnRecord::parse(ctx1, doc);
+ const auto firstTs = writeTxnRecordFn(100, 0, Timestamp());
+ const auto secondTs = writeTxnRecordFn(200, 1, firstTs);
+ DBDirectClient client(opCtx());
+ auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(),
+ {BSON("_id" << sessionId.toBSON())});
+ ASSERT(cursor);
+ ASSERT(cursor->more());
+
+ auto txnRecord = SessionTxnRecord::parse(
+ IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), cursor->next());
+ ASSERT(!cursor->more());
ASSERT_EQ(sessionId, txnRecord.getSessionId());
- ASSERT_EQ(txnNum, txnRecord.getTxnNum());
- ASSERT_EQ(ts1, txnRecord.getLastWriteOpTimeTs());
-
- ASSERT_FALSE(cursor->more());
+ ASSERT_EQ(200, txnRecord.getTxnNum());
+ ASSERT_EQ(secondTs, txnRecord.getLastWriteOpTimeTs());
+ ASSERT_EQ(secondTs, session.getLastWriteOpTimeTs(200));
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT_EQ(ts1, txnState.getLastWriteOpTimeTs());
+ session.invalidate();
+ session.refreshFromStorageIfNeeded(opCtx());
+ ASSERT_EQ(secondTs, session.getLastWriteOpTimeTs(200));
+}
- const Timestamp ts2(200, 23);
- {
- AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX);
- WriteUnitOfWork wuow(opCtx());
+TEST_F(SessionTest, StartingOldTxnShouldAssert) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
- txnState.saveTxnProgress(opCtx(), ts2);
- wuow.commit();
- }
+ const TxnNumber txnNum = 20;
+ session.beginTxn(opCtx(), txnNum);
- cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll);
- ASSERT_TRUE(cursor.get() != nullptr);
+ ASSERT_THROWS_CODE(
+ session.beginTxn(opCtx(), txnNum - 1), AssertionException, ErrorCodes::TransactionTooOld);
+ ASSERT(session.getLastWriteOpTimeTs(txnNum).isNull());
+}
- ASSERT_TRUE(cursor->more());
- doc = cursor->next();
- IDLParserErrorContext ctx2("StoreOpTime 2");
- txnRecord = SessionTxnRecord::parse(ctx2, doc);
+TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
- ASSERT_EQ(sessionId, txnRecord.getSessionId());
- ASSERT_EQ(txnNum, txnRecord.getTxnNum());
- ASSERT_EQ(ts2, txnRecord.getLastWriteOpTimeTs());
+ // Drop the transactions table
+ BSONObj dropResult;
+ DBDirectClient client(opCtx());
+ const auto& nss = NamespaceString::kSessionTransactionsTableNamespace;
+ ASSERT(client.runCommand(nss.db().toString(), BSON("drop" << nss.coll()), dropResult));
- ASSERT_FALSE(cursor->more());
+ const TxnNumber txnNum = 21;
+ session.beginTxn(opCtx(), txnNum);
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT_EQ(ts2, txnState.getLastWriteOpTimeTs());
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
+ ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()),
+ AssertionException);
}
-TEST_F(SessionTest, CanBumpTransactionIdIfNewer) {
+TEST_F(SessionTest, CheckStatementExecuted) {
const auto sessionId = makeLogicalSessionIdForTest();
- TxnNumber txnNum = 20;
- const Timestamp ts1(100, 42);
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
- Session txnState(sessionId);
- txnState.begin(opCtx(), txnNum);
+ const TxnNumber txnNum = 100;
+ session.beginTxn(opCtx(), txnNum);
- {
- AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX);
+ const auto writeTxnRecordFn = [&](StmtId stmtId, Timestamp prevTs) {
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
-
- txnState.saveTxnProgress(opCtx(), ts1);
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevTs);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime.getTimestamp());
wuow.commit();
- }
-
- DBDirectClient client(opCtx());
- Query queryAll;
- queryAll.sort(BSON("_id" << 1));
-
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll);
- ASSERT_TRUE(cursor.get() != nullptr);
-
- ASSERT_TRUE(cursor->more());
- auto doc = cursor->next();
- IDLParserErrorContext ctx1("CanBumpTransactionIdIfNewer 1");
- auto txnRecord = SessionTxnRecord::parse(ctx1, doc);
-
- ASSERT_EQ(sessionId, txnRecord.getSessionId());
- ASSERT_EQ(txnNum, txnRecord.getTxnNum());
- ASSERT_EQ(ts1, txnRecord.getLastWriteOpTimeTs());
-
- ASSERT_FALSE(cursor->more());
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT_EQ(ts1, txnState.getLastWriteOpTimeTs());
+ return opTime.getTimestamp();
+ };
- // Start a new transaction on the same session.
- txnState.begin(opCtx(), ++txnNum);
+ ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000));
+ const auto firstTs = writeTxnRecordFn(1000, Timestamp());
+ ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000));
- cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll);
- ASSERT_TRUE(cursor.get() != nullptr);
+ ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000));
+ writeTxnRecordFn(2000, firstTs);
+ ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000));
- ASSERT_TRUE(cursor->more());
- doc = cursor->next();
- IDLParserErrorContext ctx2("CanBumpTransactionIdIfNewer 2");
- txnRecord = SessionTxnRecord::parse(ctx2, doc);
+ // Invalidate the session and ensure the statements still check out
+ session.invalidate();
+ session.refreshFromStorageIfNeeded(opCtx());
- ASSERT_EQ(sessionId, txnRecord.getSessionId());
- ASSERT_EQ(txnNum, txnRecord.getTxnNum());
- ASSERT_TRUE(txnRecord.getLastWriteOpTimeTs().isNull());
-
- ASSERT_FALSE(cursor->more());
-
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT_TRUE(txnState.getLastWriteOpTimeTs().isNull());
+ ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000));
+ ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000));
}
-TEST_F(SessionTest, StartingNewSessionWithDroppedTableShouldAssert) {
+TEST_F(SessionTest, CheckStatementExecutedForOldTransactionThrows) {
const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
-
- const auto& ns = NamespaceString::kSessionTransactionsTableNamespace;
-
- BSONObj dropResult;
- DBDirectClient client(opCtx());
- ASSERT_TRUE(client.runCommand(ns.db().toString(), BSON("drop" << ns.coll()), dropResult));
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
- Session txnState(sessionId);
- ASSERT_THROWS(txnState.begin(opCtx(), txnNum), AssertionException);
+ const TxnNumber txnNum = 100;
+ session.beginTxn(opCtx(), txnNum);
- ASSERT_EQ(sessionId, txnState.getSessionId());
+ ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum - 1, 0),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
-TEST_F(SessionTest, SaveTxnProgressShouldAssertIfTableIsDropped) {
+TEST_F(SessionTest, CheckStatementExecutedForInvalidatedTransactionThrows) {
const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
- const Timestamp ts1(100, 42);
-
- Session txnState(sessionId);
- txnState.begin(opCtx(), txnNum);
-
- const auto& ns = NamespaceString::kSessionTransactionsTableNamespace;
-
- BSONObj dropResult;
- DBDirectClient client(opCtx());
- ASSERT_TRUE(client.runCommand(ns.db().toString(), BSON("drop" << ns.coll()), dropResult));
-
- AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX);
- WriteUnitOfWork wuow(opCtx());
+ Session session(sessionId);
+ session.invalidate();
- ASSERT_THROWS(txnState.saveTxnProgress(opCtx(), ts1), AssertionException);
+ ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), 100, 0),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
-TEST_F(SessionTest, TwoSessionsShouldBeIndependent) {
- const auto sessionId1 = makeLogicalSessionIdForTest();
- const TxnNumber txnNum1 = 20;
- const Timestamp ts1(1903, 42);
-
- Session txnState1(sessionId1);
- txnState1.begin(opCtx(), txnNum1);
-
- const auto sessionId2 = makeLogicalSessionIdForTest();
- const TxnNumber txnNum2 = 300;
- const Timestamp ts2(671, 5);
+TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) {
+ const auto sessionId = makeLogicalSessionIdForTest();
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
- Session txnState2(sessionId2);
- txnState2.begin(opCtx(), txnNum2);
+ const TxnNumber txnNum = 100;
+ session.beginTxn(opCtx(), txnNum);
{
- AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX);
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
-
- txnState2.saveTxnProgress(opCtx(), ts2);
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp());
wuow.commit();
}
{
- AutoGetCollection autoColl(opCtx(), NamespaceString("test.user"), MODE_IX);
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
-
- txnState1.saveTxnProgress(opCtx(), ts1);
- wuow.commit();
- }
-
- DBDirectClient client(opCtx());
- Query queryAll;
- queryAll.sort(BSON(SessionTxnRecord::kTxnNumFieldName << 1));
-
- auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), queryAll);
- ASSERT_TRUE(cursor.get() != nullptr);
-
- ASSERT_TRUE(cursor->more());
-
- {
- auto doc = cursor->next();
- IDLParserErrorContext ctx("TwoSessionsShouldBeIndependent 1");
- auto txnRecord = SessionTxnRecord::parse(ctx, doc);
-
- ASSERT_EQ(sessionId1, txnRecord.getSessionId());
- ASSERT_EQ(txnNum1, txnRecord.getTxnNum());
- ASSERT_EQ(ts1, txnRecord.getLastWriteOpTimeTs());
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0);
+ ASSERT_THROWS_CODE(
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime.getTimestamp()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
-
- ASSERT_TRUE(cursor->more());
-
- {
- auto doc = cursor->next();
- IDLParserErrorContext ctx("TwoSessionsShouldBeIndependent 2");
- auto txnRecord = SessionTxnRecord::parse(ctx, doc);
-
- ASSERT_EQ(sessionId2, txnRecord.getSessionId());
- ASSERT_EQ(txnNum2, txnRecord.getTxnNum());
- ASSERT_EQ(ts2, txnRecord.getLastWriteOpTimeTs());
- }
-
- ASSERT_FALSE(cursor->more());
}
-TEST_F(SessionTest, CheckStatementExecuted) {
+TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
- const StmtId stmtId = 5;
-
- opCtx()->setLogicalSessionId(sessionId);
- opCtx()->setTxnNumber(txnNum);
-
Session session(sessionId);
- session.begin(opCtx(), txnNum);
-
- // Returns nothing if the statement has not been executed.
- auto fetchedEntry = session.checkStatementExecuted(opCtx(), stmtId);
- ASSERT_FALSE(fetchedEntry);
-
- // Returns the correct oplog entry if the statement has completed.
- auto optimeTs = Timestamp(50, 10);
-
- OperationSessionInfo opSessionInfo;
- opSessionInfo.setSessionId(sessionId);
- opSessionInfo.setTxnNumber(txnNum);
-
- repl::OplogEntry oplogEntry(repl::OpTime(optimeTs, 1),
- 0,
- repl::OpTypeEnum::kInsert,
- NamespaceString("a.b"),
- 0,
- BSON("_id" << 1 << "x" << 5));
- oplogEntry.setOperationSessionInfo(opSessionInfo);
- oplogEntry.setStatementId(stmtId);
- oplogEntry.setPrevWriteTsInTransaction(Timestamp(0, 0));
- insertOplogEntry(oplogEntry);
-
- {
- AutoGetCollection autoColl(opCtx(), NamespaceString("a.b"), MODE_IX);
- WriteUnitOfWork wuow(opCtx());
+ session.refreshFromStorageIfNeeded(opCtx());
- session.saveTxnProgress(opCtx(), optimeTs);
- wuow.commit();
- }
+ const TxnNumber txnNum = 100;
+ session.beginTxn(opCtx(), txnNum);
- fetchedEntry = session.checkStatementExecuted(opCtx(), stmtId);
- ASSERT_TRUE(fetchedEntry);
- ASSERT_EQ(fetchedEntry->getStatementId().get(), stmtId);
-
- // Still returns nothing for uncompleted statements.
- auto uncompletedStmtId = 10;
- fetchedEntry = session.checkStatementExecuted(opCtx(), uncompletedStmtId);
- ASSERT_FALSE(fetchedEntry);
-}
-
-TEST_F(SessionTest, BeginReloadsStateAfterReset) {
- const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
-
- Session txnState(sessionId);
- txnState.begin(opCtx(), txnNum);
-
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT(txnState.getLastWriteOpTimeTs().isNull());
-
- const TxnNumber newTxnNum = 50;
- const auto newTs = Timestamp(1, 1);
- Session::updateSessionRecord(opCtx(), sessionId, newTxnNum, newTs);
- txnState.reset();
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
- txnState.begin(opCtx(), newTxnNum);
+ session.invalidate();
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(newTxnNum, txnState.getTxnNum());
- ASSERT_EQ(txnState.getLastWriteOpTimeTs(), newTs);
+ ASSERT_THROWS_CODE(
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp()),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
-TEST_F(SessionTest, BeginDoesNotReloadWithoutReset) {
+// TODO: These tests require the storage engine used for testing to support document-level
+// concurrency control
+#if (0)
+TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresOldTransaction) {
const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
-
- Session txnState(sessionId);
- txnState.begin(opCtx(), txnNum);
-
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT(txnState.getLastWriteOpTimeTs().isNull());
-
- const TxnNumber newTxnNum = 30;
- const auto newTs = Timestamp(1, 1);
- Session::updateSessionRecord(opCtx(), sessionId, newTxnNum, newTs);
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
- txnState.begin(opCtx(), txnNum);
+ const TxnNumber txnNum = 100;
+ session.beginTxn(opCtx(), txnNum);
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT(txnState.getLastWriteOpTimeTs().isNull());
-}
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
-TEST_F(SessionTest, StartingOldTxnFailsAfterReset) {
- const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber oldTxnNum = 20;
+ {
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp());
+ }
- Session txnState(sessionId);
- txnState.begin(opCtx(), oldTxnNum);
+ // Mimics a different thread starting a newer transaction on the same session
+ const TxnNumber newTxnNum = 200;
+
+ stdx::async(stdx::launch::async, [&sessionId, &session, newTxnNum] {
+ Client::initThreadIfNotAlready();
+ auto sideOpCtx = Client::getCurrent()->makeOperationContext();
+ AutoGetCollection autoColl(sideOpCtx.get(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(sideOpCtx.get());
+ const auto opTime = logOp(sideOpCtx.get(), kNss, sessionId, newTxnNum, 1000);
+ session.onWriteOpCompletedOnPrimary(
+ sideOpCtx.get(), newTxnNum, {1000}, opTime.getTimestamp());
+ wuow.commit();
+ }).get();
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(oldTxnNum, txnState.getTxnNum());
- ASSERT(txnState.getLastWriteOpTimeTs().isNull());
+ wuow.commit();
- const TxnNumber newTxnNum = 30;
- Session::updateSessionRecord(opCtx(), sessionId, newTxnNum, Timestamp());
- txnState.reset();
+ // The newer transaction must win
+ ASSERT(!session.checkStatementExecuted(opCtx(), newTxnNum, 0));
+ ASSERT(session.checkStatementExecuted(opCtx(), newTxnNum, 1000));
- ASSERT_THROWS(txnState.begin(opCtx(), oldTxnNum), AssertionException);
+ ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum, 0),
+ AssertionException,
+ ErrorCodes::ConflictingOperationInProgress);
}
+#endif
-TEST_F(SessionTest, CanStartLaterTxnAfterReset) {
+TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
const auto sessionId = makeLogicalSessionIdForTest();
- const TxnNumber txnNum = 20;
+ Session session(sessionId);
+ session.refreshFromStorageIfNeeded(opCtx());
- Session txnState(sessionId);
- txnState.begin(opCtx(), txnNum);
+ const TxnNumber txnNum = 100;
+ session.beginTxn(opCtx(), txnNum);
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(txnNum, txnState.getTxnNum());
- ASSERT(txnState.getLastWriteOpTimeTs().isNull());
+ {
+ AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
+ WriteUnitOfWork wuow(opCtx());
+ const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0);
+ session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime.getTimestamp());
- txnState.reset();
+ session.invalidate();
- const TxnNumber newTxnNum = 40;
- txnState.begin(opCtx(), newTxnNum);
+ wuow.commit();
+ }
- ASSERT_EQ(sessionId, txnState.getSessionId());
- ASSERT_EQ(newTxnNum, txnState.getTxnNum());
- ASSERT(txnState.getLastWriteOpTimeTs().isNull());
+ session.refreshFromStorageIfNeeded(opCtx());
+ ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 0));
}
} // namespace