/** * Copyright (C) 2017 MongoDB, Inc. * * This program is free software: you can redistribute it and/or modify * it under the terms of the GNU Affero General Public License, version 3, * as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, * but WITHOUT ANY WARRANTY; without even the implied warranty of * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the * GNU Affero General Public License for more details. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . * * As a special exception, the copyright holders give permission to link the * code of portions of this program with the OpenSSL library under certain * conditions as described in each individual source file and distribute * linked combinations including the program with the OpenSSL library. You * must comply with the GNU Affero General Public License in all respects for * all of the code used other than as permitted herein. If you modify file(s) * with this exception, you may extend this exception to your version of the * file(s), but you are not obligated to do so. If you do not wish to do so, * delete this exception statement from your version. If you delete this * exception statement from all source files in the program, then also delete * it in the license file. */ #include "mongo/platform/basic.h" #include "mongo/db/client.h" #include "mongo/db/db_raii.h" #include "mongo/db/dbdirectclient.h" #include "mongo/db/operation_context.h" #include "mongo/db/repl/mock_repl_coord_server_fixture.h" #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog.h" #include "mongo/db/stats/fill_locker_info.h" #include "mongo/stdx/future.h" #include "mongo/stdx/memory.h" #include "mongo/unittest/unittest.h" #include "mongo/util/net/sock.h" namespace mongo { namespace { const NamespaceString kNss("TestDB", "TestColl"); const OptionalCollectionUUID kUUID; /** * Creates an OplogEntry with given parameters and preset defaults for this test suite. */ repl::OplogEntry makeOplogEntry(repl::OpTime opTime, repl::OpTypeEnum opType, BSONObj object, OperationSessionInfo sessionInfo, boost::optional wallClockTime, boost::optional stmtId, boost::optional prevWriteOpTimeInTransaction) { return repl::OplogEntry( opTime, // optime 0, // hash opType, // opType kNss, // namespace boost::none, // uuid boost::none, // fromMigrate 0, // version object, // o boost::none, // o2 sessionInfo, // sessionInfo boost::none, // upsert wallClockTime, // wall clock time stmtId, // statement id prevWriteOpTimeInTransaction, // optime of previous write within same transaction boost::none, // pre-image optime boost::none); // post-image optime } class SessionTest : public MockReplCoordServerFixture { protected: void setUp() final { MockReplCoordServerFixture::setUp(); auto service = opCtx()->getServiceContext(); SessionCatalog::get(service)->reset_forTest(); SessionCatalog::get(service)->onStepUp(opCtx()); } SessionCatalog* catalog() { return SessionCatalog::get(opCtx()->getServiceContext()); } static repl::OpTime logOp(OperationContext* opCtx, const NamespaceString& nss, const LogicalSessionId& lsid, TxnNumber txnNumber, StmtId stmtId) { return logOp(opCtx, nss, lsid, txnNumber, stmtId, {}); } static repl::OpTime logOp(OperationContext* opCtx, const NamespaceString& nss, const LogicalSessionId& lsid, TxnNumber txnNumber, StmtId stmtId, repl::OpTime prevOpTime) { OperationSessionInfo osi; osi.setSessionId(lsid); osi.setTxnNumber(txnNumber); repl::OplogLink link; link.prevOpTime = prevOpTime; return repl::logOp(opCtx, "n", nss, kUUID, BSON("TestValue" << 0), nullptr, false, Date_t::now(), osi, stmtId, link); } void bumpTxnNumberFromDifferentOpCtx(Session* session, TxnNumber newTxnNum) { // Stash the original client. auto originalClient = Client::releaseCurrent(); // Create a migration client and opCtx. auto service = opCtx()->getServiceContext(); auto migrationClientOwned = service->makeClient("migrationClient"); auto migrationClient = migrationClientOwned.get(); Client::setCurrent(std::move(migrationClientOwned)); auto migrationOpCtx = migrationClient->makeOperationContext(); // Check that there is a transaction in progress with a lower txnNumber. ASSERT(session->inMultiDocumentTransaction()); ASSERT_LT(session->getActiveTxnNumberForTest(), newTxnNum); // Check that the transaction has some operations, so we can ensure they are cleared. ASSERT_GT(session->transactionOperationsForTest().size(), 0u); // Bump the active transaction number on the session. This should clear all state from the // previous transaction. session->beginOrContinueTxnOnMigration(migrationOpCtx.get(), newTxnNum); ASSERT_EQ(session->getActiveTxnNumberForTest(), newTxnNum); ASSERT_FALSE(session->inMultiDocumentTransaction()); ASSERT_FALSE(session->transactionIsAborted()); ASSERT_EQ(session->transactionOperationsForTest().size(), 0u); // Restore the original client. migrationOpCtx.reset(); Client::releaseCurrent(); Client::setCurrent(std::move(originalClient)); } }; TEST_F(SessionTest, SessionEntryNotWrittenOnBegin) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 20; session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); ASSERT_EQ(sessionId, session.getSessionId()); ASSERT(session.getLastWriteOpTime(txnNum).isNull()); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), {BSON("_id" << sessionId.toBSON())}); ASSERT(cursor); ASSERT(!cursor->more()); } TEST_F(SessionTest, SessionEntryWrittenAtFirstWrite) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 21; session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); const auto opTime = [&] { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); wuow.commit(); return opTime; }(); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), {BSON("_id" << sessionId.toBSON())}); ASSERT(cursor); ASSERT(cursor->more()); auto txnRecord = SessionTxnRecord::parse( IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), cursor->next()); ASSERT(!cursor->more()); ASSERT_EQ(sessionId, txnRecord.getSessionId()); ASSERT_EQ(txnNum, txnRecord.getTxnNum()); ASSERT_EQ(opTime, txnRecord.getLastWriteOpTime()); ASSERT_EQ(opTime, session.getLastWriteOpTime(txnNum)); } TEST_F(SessionTest, StartingNewerTransactionUpdatesThePersistedSession) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const auto writeTxnRecordFn = [&](TxnNumber txnNum, StmtId stmtId, repl::OpTime prevOpTime) { session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); wuow.commit(); return opTime; }; const auto firstOpTime = writeTxnRecordFn(100, 0, {}); const auto secondOpTime = writeTxnRecordFn(200, 1, firstOpTime); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace.ns(), {BSON("_id" << sessionId.toBSON())}); ASSERT(cursor); ASSERT(cursor->more()); auto txnRecord = SessionTxnRecord::parse( IDLParserErrorContext("SessionEntryWrittenAtFirstWrite"), cursor->next()); ASSERT(!cursor->more()); ASSERT_EQ(sessionId, txnRecord.getSessionId()); ASSERT_EQ(200, txnRecord.getTxnNum()); ASSERT_EQ(secondOpTime, txnRecord.getLastWriteOpTime()); ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); session.invalidate(); session.refreshFromStorageIfNeeded(opCtx()); ASSERT_EQ(secondOpTime, session.getLastWriteOpTime(200)); } TEST_F(SessionTest, StartingOldTxnShouldAssert) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 20; session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); ASSERT_THROWS_CODE(session.beginOrContinueTxn(opCtx(), txnNum - 1, boost::none, boost::none), AssertionException, ErrorCodes::TransactionTooOld); ASSERT(session.getLastWriteOpTime(txnNum).isNull()); } TEST_F(SessionTest, SessionTransactionsCollectionNotDefaultCreated) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); // Drop the transactions table BSONObj dropResult; DBDirectClient client(opCtx()); const auto& nss = NamespaceString::kSessionTransactionsTableNamespace; ASSERT(client.runCommand(nss.db().toString(), BSON("drop" << nss.coll()), dropResult)); const TxnNumber txnNum = 21; session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); ASSERT_THROWS(session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), AssertionException); } TEST_F(SessionTest, CheckStatementExecuted) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); const auto writeTxnRecordFn = [&](StmtId stmtId, repl::OpTime prevOpTime) { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, stmtId, prevOpTime); session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {stmtId}, opTime, Date_t::now()); wuow.commit(); return opTime; }; ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); const auto firstOpTime = writeTxnRecordFn(1000, {}); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); ASSERT(!session.checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(!session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); writeTxnRecordFn(2000, firstOpTime); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); // Invalidate the session and ensure the statements still check out session.invalidate(); session.refreshFromStorageIfNeeded(opCtx()); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); } TEST_F(SessionTest, CheckStatementExecutedForOldTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum - 1, 0), AssertionException, ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, CheckStatementExecutedForInvalidatedTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.invalidate(); ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), 100, 0), AssertionException, ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); wuow.commit(); } { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum - 1, 0); ASSERT_THROWS_CODE( session.onWriteOpCompletedOnPrimary(opCtx(), txnNum - 1, {0}, opTime, Date_t::now()), AssertionException, ErrorCodes::ConflictingOperationInProgress); } } TEST_F(SessionTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); session.invalidate(); ASSERT_THROWS_CODE( session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()), AssertionException, ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 100; session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = logOp(opCtx(), kNss, sessionId, txnNum, 0); session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {0}, opTime, Date_t::now()); session.invalidate(); wuow.commit(); } session.refreshFromStorageIfNeeded(opCtx()); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 0)); } TEST_F(SessionTest, IncompleteHistoryDueToOpLogTruncation) { const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 2; { OperationSessionInfo osi; osi.setSessionId(sessionId); osi.setTxnNumber(txnNum); auto entry0 = makeOplogEntry(repl::OpTime(Timestamp(100, 0), 0), // optime repl::OpTypeEnum::kInsert, // op type BSON("x" << 0), // o osi, // session info Date_t::now(), // wall clock time 0, // statement id boost::none); // optime of previous write within same transaction // Intentionally skip writing the oplog entry for statement 0, so that it appears as if the // chain of log entries is broken because of oplog truncation auto entry1 = makeOplogEntry(repl::OpTime(Timestamp(100, 1), 0), // optime repl::OpTypeEnum::kInsert, // op type BSON("x" << 1), // o osi, // session info Date_t::now(), // wall clock time 1, // statement id entry0.getOpTime()); // optime of previous write within same transaction insertOplogEntry(entry1); auto entry2 = makeOplogEntry(repl::OpTime(Timestamp(100, 2), 0), // optime repl::OpTypeEnum::kInsert, // op type BSON("x" << 2), // o osi, // session info Date_t::now(), // wall clock time 2, // statement id entry1.getOpTime()); // optime of previous write within same transaction insertOplogEntry(entry2); DBDirectClient client(opCtx()); client.insert(NamespaceString::kSessionTransactionsTableNamespace.ns(), [&] { SessionTxnRecord sessionRecord; sessionRecord.setSessionId(sessionId); sessionRecord.setTxnNum(txnNum); sessionRecord.setLastWriteOpTime(entry2.getOpTime()); sessionRecord.setLastWriteDate(*entry2.getWallClockTime()); return sessionRecord.toBSON(); }()); } Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); ASSERT_THROWS_CODE(session.checkStatementExecuted(opCtx(), txnNum, 0), AssertionException, ErrorCodes::IncompleteTransactionHistory); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 1)); ASSERT(session.checkStatementExecuted(opCtx(), txnNum, 2)); ASSERT_THROWS_CODE(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 0), AssertionException, ErrorCodes::IncompleteTransactionHistory); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 1)); ASSERT(session.checkStatementExecutedNoOplogEntryFetch(txnNum, 2)); } TEST_F(SessionTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 2; OperationSessionInfo osi; osi.setSessionId(sessionId); osi.setTxnNumber(txnNum); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); auto firstOpTime = ([&]() { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto wallClockTime = Date_t::now(); auto opTime = repl::logOp(opCtx(), "i", kNss, kUUID, BSON("x" << 1), &Session::kDeadEndSentinel, false, wallClockTime, osi, 1, {}); session.onWriteOpCompletedOnPrimary(opCtx(), txnNum, {1}, opTime, wallClockTime); wuow.commit(); return opTime; })(); { repl::OplogLink link; link.prevOpTime = firstOpTime; AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto wallClockTime = Date_t::now(); auto opTime = repl::logOp(opCtx(), "n", kNss, kUUID, {}, &Session::kDeadEndSentinel, false, wallClockTime, osi, kIncompleteHistoryStmtId, link); session.onWriteOpCompletedOnPrimary( opCtx(), txnNum, {kIncompleteHistoryStmtId}, opTime, wallClockTime); wuow.commit(); } { auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1); ASSERT_TRUE(oplog); ASSERT_EQ(firstOpTime, oplog->getOpTime()); } ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException); // Should have the same behavior after loading state from storage. session.invalidate(); session.refreshFromStorageIfNeeded(opCtx()); { auto oplog = session.checkStatementExecuted(opCtx(), txnNum, 1); ASSERT_TRUE(oplog); ASSERT_EQ(firstOpTime, oplog->getOpTime()); } ASSERT_THROWS(session.checkStatementExecuted(opCtx(), txnNum, 2), AssertionException); } TEST_F(SessionTest, StashAndUnstashResources) { const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 20; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); Locker* originalLocker = opCtx()->lockState(); RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit(); ASSERT(originalLocker); ASSERT(originalRecoveryUnit); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); repl::ReadConcernArgs readConcernArgs; ASSERT_OK(readConcernArgs.initialize(BSON("find" << "test" << repl::ReadConcernArgs::kReadConcernFieldName << BSON(repl::ReadConcernArgs::kLevelFieldName << "snapshot")))); repl::ReadConcernArgs::get(opCtx()) = readConcernArgs; // Perform initial unstash which sets up a WriteUnitOfWork. session.unstashTransactionResources(opCtx(), "find"); ASSERT_EQUALS(originalLocker, opCtx()->lockState()); ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit()); ASSERT(opCtx()->getWriteUnitOfWork()); // Take a lock. This is expected in order to stash resources. Lock::GlobalRead lk(opCtx(), Date_t::now()); ASSERT(lk.isLocked()); // Stash resources. The original Locker and RecoveryUnit now belong to the stash. opCtx()->setStashedCursor(); session.stashTransactionResources(opCtx()); ASSERT_NOT_EQUALS(originalLocker, opCtx()->lockState()); ASSERT_NOT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit()); ASSERT(!opCtx()->getWriteUnitOfWork()); // Unset the read concern on the OperationContext. This is needed to unstash. repl::ReadConcernArgs::get(opCtx()) = repl::ReadConcernArgs(); // Unstash the stashed resources. This restores the original Locker and RecoveryUnit to the // OperationContext. session.unstashTransactionResources(opCtx(), "find"); ASSERT_EQUALS(originalLocker, opCtx()->lockState()); ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit()); ASSERT(opCtx()->getWriteUnitOfWork()); // Commit the transaction. This allows us to release locks. session.commitTransaction(opCtx()); } TEST_F(SessionTest, ReportStashedResources) { const auto sessionId = makeLogicalSessionIdForTest(); const TxnNumber txnNum = 20; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); ASSERT(opCtx()->lockState()); ASSERT(opCtx()->recoveryUnit()); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none); repl::ReadConcernArgs readConcernArgs; ASSERT_OK(readConcernArgs.initialize(BSON("find" << "test" << repl::ReadConcernArgs::kReadConcernFieldName << BSON(repl::ReadConcernArgs::kLevelFieldName << "snapshot")))); repl::ReadConcernArgs::get(opCtx()) = readConcernArgs; // Perform initial unstash which sets up a WriteUnitOfWork. session.unstashTransactionResources(opCtx(), "find"); ASSERT(opCtx()->getWriteUnitOfWork()); // Take a lock. This is expected in order to stash resources. Lock::GlobalRead lk(opCtx(), Date_t::now()); ASSERT(lk.isLocked()); // Build a BSONObj containing the details which we expect to see reported when we call // Session::reportStashedState. const auto lockerInfo = opCtx()->lockState()->getLockerInfo(); ASSERT(lockerInfo); auto reportBuilder = std::move(BSONObjBuilder() << "host" << getHostNameCachedAndPort() << "desc" << "inactive transaction" << "lsid" << sessionId.toBSON() << "txnNumber" << txnNum << "waitingForLock" << false << "active" << false); fillLockerInfo(*lockerInfo, reportBuilder); // Stash resources. The original Locker and RecoveryUnit now belong to the stash. opCtx()->setStashedCursor(); session.stashTransactionResources(opCtx()); ASSERT(!opCtx()->getWriteUnitOfWork()); // Verify that the Session's report of its own stashed state aligns with our expectations. ASSERT_BSONOBJ_EQ(session.reportStashedState(), reportBuilder.obj()); // Unset the read concern on the OperationContext. This is needed to unstash. repl::ReadConcernArgs::get(opCtx()) = repl::ReadConcernArgs(); // Unstash the stashed resources. This restores the original Locker and RecoveryUnit to the // OperationContext. session.unstashTransactionResources(opCtx(), "commitTransaction"); ASSERT(opCtx()->getWriteUnitOfWork()); // With the resources unstashed, verify that the Session reports an empty stashed state. ASSERT(session.reportStashedState().isEmpty()); // Commit the transaction. This allows us to release locks. session.commitTransaction(opCtx()); } TEST_F(SessionTest, CannotSpecifyStartTransactionOnInProgressTxn) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); // Autocommit should be true by default ASSERT(session.getAutocommit()); const TxnNumber txnNum = 100; // Must specify startTransaction=true and autocommit=false to start a transaction. session.beginOrContinueTxn(opCtx(), txnNum, false, true); // Autocommit should be set to false and we should be in a mult-doc transaction. ASSERT_FALSE(session.getAutocommit()); ASSERT_TRUE(session.inSnapshotReadOrMultiDocumentTransaction()); // Cannot try to start a transaction that already started. ASSERT_THROWS_CODE(session.beginOrContinueTxn(opCtx(), txnNum, false, true), AssertionException, ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, AutocommitRequiredOnEveryTxnOp) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); // Autocommit should be true by default ASSERT(session.getAutocommit()); const TxnNumber txnNum = 100; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); // We must have stashed transaction resources to do a second operation on the transaction. session.unstashTransactionResources(opCtx(), "insert"); // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } session.stashTransactionResources(opCtx()); // Autocommit should be set to false ASSERT_FALSE(session.getAutocommit()); // Omitting 'autocommit' after the first statement of a transaction should throw an error. ASSERT_THROWS_CODE(session.beginOrContinueTxn(opCtx(), txnNum, boost::none, boost::none), AssertionException, ErrorCodes::InvalidOptions); // Setting 'autocommit=true' should throw an error. ASSERT_THROWS_CODE(session.beginOrContinueTxn(opCtx(), txnNum, true, boost::none), AssertionException, ErrorCodes::InvalidOptions); // Including autocommit=false should succeed. session.beginOrContinueTxn(opCtx(), txnNum, false, boost::none); } TEST_F(SessionTest, TransactionsOnlyPermitAllowedReadPreferences) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); TxnNumber txnNum = 1; // // Multi-statement transaction operations can only be run with 'readPreference=primary'. // auto startTxnWithReadPref = [&](ReadPreference readPref, boost::optional autocommit, boost::optional startTransaction) { txnNum++; ReadPreferenceSetting::get(opCtx()) = ReadPreferenceSetting(readPref); session.beginOrContinueTxn(opCtx(), txnNum, autocommit, startTransaction); }; // Shouldn't throw. startTxnWithReadPref(ReadPreference::PrimaryOnly, false, true); ASSERT_TRUE(session.inSnapshotReadOrMultiDocumentTransaction()); // All unsupported read preferences should throw. ASSERT_THROWS_CODE(startTxnWithReadPref(ReadPreference::PrimaryPreferred, false, true), AssertionException, 50789); ASSERT_THROWS_CODE(startTxnWithReadPref(ReadPreference::SecondaryOnly, false, true), AssertionException, 50789); ASSERT_THROWS_CODE(startTxnWithReadPref(ReadPreference::SecondaryPreferred, false, true), AssertionException, 50789); ASSERT_THROWS_CODE( startTxnWithReadPref(ReadPreference::Nearest, false, true), AssertionException, 50789); // // Operations that are not on a multi-statement transaction are allowed to specify any // readPreference. // auto activeTxnNum = TxnNumber{-1}; // None of these should throw. Each should start a transaction with a new, higher, transaction // number. startTxnWithReadPref(ReadPreference::PrimaryOnly, boost::none, boost::none); ASSERT_GT(session.getActiveTxnNumberForTest(), activeTxnNum); activeTxnNum = session.getActiveTxnNumberForTest(); startTxnWithReadPref(ReadPreference::PrimaryPreferred, boost::none, boost::none); ASSERT_GT(session.getActiveTxnNumberForTest(), activeTxnNum); activeTxnNum = session.getActiveTxnNumberForTest(); startTxnWithReadPref(ReadPreference::SecondaryOnly, boost::none, boost::none); ASSERT_GT(session.getActiveTxnNumberForTest(), activeTxnNum); activeTxnNum = session.getActiveTxnNumberForTest(); startTxnWithReadPref(ReadPreference::SecondaryPreferred, boost::none, boost::none); ASSERT_GT(session.getActiveTxnNumberForTest(), activeTxnNum); activeTxnNum = session.getActiveTxnNumberForTest(); startTxnWithReadPref(ReadPreference::Nearest, boost::none, boost::none); ASSERT_GT(session.getActiveTxnNumberForTest(), activeTxnNum); } TEST_F(SessionTest, SameTransactionPreservesStoredStatements) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 22; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); // We must have stashed transaction resources to re-open the transaction. session.unstashTransactionResources(opCtx(), "insert"); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); session.addTransactionOperation(opCtx(), operation); ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON()); // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } session.stashTransactionResources(opCtx()); // Check the transaction operations before re-opening the transaction. ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON()); // Re-opening the same transaction should have no effect. session.beginOrContinueTxn(opCtx(), txnNum, false, boost::none); ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON()); } TEST_F(SessionTest, AbortClearsStoredStatements) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 24; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "insert"); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); session.addTransactionOperation(opCtx(), operation); ASSERT_BSONOBJ_EQ(operation.toBSON(), session.transactionOperationsForTest()[0].toBSON()); // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } session.stashTransactionResources(opCtx()); session.abortArbitraryTransaction(); ASSERT_TRUE(session.transactionOperationsForTest().empty()); ASSERT_TRUE(session.transactionIsAborted()); } // This test makes sure the commit machinery works even when no operations are done on the // transaction. TEST_F(SessionTest, EmptyTransactionCommit) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 25; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction machinery cannot store an empty locker. Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); session.commitTransaction(opCtx()); session.stashTransactionResources(opCtx()); ASSERT_TRUE(session.transactionIsCommitted()); } // This test makes sure the abort machinery works even when no operations are done on the // transaction. TEST_F(SessionTest, EmptyTransactionAbort) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "abortTransaction"); // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } session.stashTransactionResources(opCtx()); session.abortArbitraryTransaction(); ASSERT_TRUE(session.transactionIsAborted()); } TEST_F(SessionTest, ConcurrencyOfUnstashAndAbort) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); // The transaction may be aborted without checking out the session. session.abortArbitraryTransaction(); // An unstash after an abort should uassert. ASSERT_THROWS_CODE(session.unstashTransactionResources(opCtx(), "find"), AssertionException, ErrorCodes::NoSuchTransaction); } TEST_F(SessionTest, ConcurrencyOfUnstashAndMigration) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "insert"); // The transaction machinery cannot store an empty locker. { Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now()); } auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); session.addTransactionOperation(opCtx(), operation); session.stashTransactionResources(opCtx()); // A migration may bump the active transaction number without checking out the session. const TxnNumber higherTxnNum = 27; bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); // An unstash after a migration that bumps the active transaction number should uassert. ASSERT_THROWS_CODE(session.unstashTransactionResources(opCtx(), "insert"), AssertionException, ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, ConcurrencyOfStashAndAbort) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "find"); // The transaction may be aborted without checking out the session. session.abortArbitraryTransaction(); // A stash after an abort should be a noop. session.stashTransactionResources(opCtx()); } TEST_F(SessionTest, ConcurrencyOfStashAndMigration) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "insert"); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); session.addTransactionOperation(opCtx(), operation); // A migration may bump the active transaction number without checking out the session. const TxnNumber higherTxnNum = 27; bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); // A stash after a migration that bumps the active transaction number should uassert. ASSERT_THROWS_CODE(session.stashTransactionResources(opCtx()), AssertionException, ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndAbort) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "insert"); // The transaction may be aborted without checking out the session. session.abortArbitraryTransaction(); // An addTransactionOperation() after an abort should uassert. auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); ASSERT_THROWS_CODE(session.addTransactionOperation(opCtx(), operation), AssertionException, ErrorCodes::TransactionAborted); } TEST_F(SessionTest, ConcurrencyOfAddTransactionOperationAndMigration) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "find"); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); session.addTransactionOperation(opCtx(), operation); // A migration may bump the active transaction number without checking out the session. const TxnNumber higherTxnNum = 27; bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); // An addTransactionOperation() after a migration that bumps the active transaction number // should uassert. ASSERT_THROWS_CODE(session.addTransactionOperation(opCtx(), operation), AssertionException, ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "insert"); // The transaction may be aborted without checking out the session. session.abortArbitraryTransaction(); // An endTransactionAndRetrieveOperations() after an abort should uassert. ASSERT_THROWS_CODE(session.endTransactionAndRetrieveOperations(opCtx()), AssertionException, ErrorCodes::TransactionAborted); } TEST_F(SessionTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "insert"); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); session.addTransactionOperation(opCtx(), operation); // A migration may bump the active transaction number without checking out the session. const TxnNumber higherTxnNum = 27; bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); // An endTransactionAndRetrieveOperations() after a migration that bumps the active transaction // number should uassert. ASSERT_THROWS_CODE(session.endTransactionAndRetrieveOperations(opCtx()), AssertionException, ErrorCodes::ConflictingOperationInProgress); } TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndAbort) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "commitTransaction"); // The transaction may be aborted without checking out the session. session.abortArbitraryTransaction(); // An commitTransaction() after an abort should uassert. ASSERT_THROWS_CODE( session.commitTransaction(opCtx()), AssertionException, ErrorCodes::TransactionAborted); } TEST_F(SessionTest, ConcurrencyOfCommitTransactionAndMigration) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 26; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "insert"); auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0)); session.addTransactionOperation(opCtx(), operation); // A migration may bump the active transaction number without checking out the session. const TxnNumber higherTxnNum = 27; bumpTxnNumberFromDifferentOpCtx(&session, higherTxnNum); // An commitTransaction() after a migration that bumps the active transaction number should // uassert. ASSERT_THROWS_CODE(session.commitTransaction(opCtx()), AssertionException, ErrorCodes::ConflictingOperationInProgress); } // Tests that a transaction aborts if it becomes too large before trying to commit it. TEST_F(SessionTest, TransactionTooLargeWhileBuilding) { const auto sessionId = makeLogicalSessionIdForTest(); Session session(sessionId); session.refreshFromStorageIfNeeded(opCtx()); const TxnNumber txnNum = 28; opCtx()->setLogicalSessionId(sessionId); opCtx()->setTxnNumber(txnNum); session.beginOrContinueTxn(opCtx(), txnNum, false, true); session.unstashTransactionResources(opCtx(), "insert"); // Two 6MB operations should succeed; three 6MB operations should fail. constexpr size_t kBigDataSize = 6 * 1024 * 1024; std::unique_ptr bigData(new uint8_t[kBigDataSize]()); auto operation = repl::OplogEntry::makeInsertOperation( kNss, kUUID, BSON("_id" << 0 << "data" << BSONBinData(bigData.get(), kBigDataSize, BinDataGeneral))); session.addTransactionOperation(opCtx(), operation); session.addTransactionOperation(opCtx(), operation); ASSERT_THROWS_CODE(session.addTransactionOperation(opCtx(), operation), AssertionException, ErrorCodes::TransactionTooLarge); } } // namespace } // namespace mongo