/**
* Copyright (C) 2018 MongoDB, Inc.
*
* This program is free software: you can redistribute it and/or modify
* it under the terms of the GNU Affero General Public License, version 3,
* as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful,
* but WITHOUT ANY WARRANTY; without even the implied warranty of
* MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU Affero General Public License for more details.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see .
*
* As a special exception, the copyright holders give permission to link the
* code of portions of this program with the OpenSSL library under certain
* conditions as described in each individual source file and distribute
* linked combinations including the program with the OpenSSL library. You
* must comply with the GNU Affero General Public License in all respects for
* all of the code used other than as permitted herein. If you modify file(s)
* with this exception, you may extend this exception to your version of the
* file(s), but you are not obligated to do so. If you do not wish to do so,
* delete this exception statement from your version. If you delete this
* exception statement from all source files in the program, then also delete
* it in the license file.
*/
#include "mongo/platform/basic.h"
#include "mongo/db/client.h"
#include "mongo/db/db_raii.h"
#include "mongo/db/dbdirectclient.h"
#include "mongo/db/op_observer_noop.h"
#include "mongo/db/op_observer_registry.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/operation_context_session_mongod.h"
#include "mongo/db/repl/mock_repl_coord_server_fixture.h"
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog_mongod.h"
#include "mongo/db/stats/fill_locker_info.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/stdx/future.h"
#include "mongo/stdx/memory.h"
#include "mongo/unittest/death_test.h"
#include "mongo/unittest/unittest.h"
#include "mongo/util/clock_source_mock.h"
#include "mongo/util/net/socket_utils.h"
#include "mongo/util/tick_source_mock.h"
namespace mongo {
namespace {
const NamespaceString kNss("TestDB", "TestColl");
const OptionalCollectionUUID kUUID;
/**
* Creates an OplogEntry with given parameters and preset defaults for this test suite.
*/
repl::OplogEntry makeOplogEntry(repl::OpTime opTime,
repl::OpTypeEnum opType,
BSONObj object,
OperationSessionInfo sessionInfo,
boost::optional wallClockTime,
boost::optional stmtId,
boost::optional prevWriteOpTimeInTransaction) {
return repl::OplogEntry(
opTime, // optime
0, // hash
opType, // opType
kNss, // namespace
boost::none, // uuid
boost::none, // fromMigrate
0, // version
object, // o
boost::none, // o2
sessionInfo, // sessionInfo
boost::none, // upsert
wallClockTime, // wall clock time
stmtId, // statement id
prevWriteOpTimeInTransaction, // optime of previous write within same transaction
boost::none, // pre-image optime
boost::none); // post-image optime
}
class OpObserverMock : public OpObserverNoop {
public:
void onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) override;
bool onTransactionPrepareThrowsException = false;
bool transactionPrepared = false;
stdx::function onTransactionPrepareFn = []() {};
void onTransactionCommit(OperationContext* opCtx,
boost::optional commitOplogEntryOpTime,
boost::optional commitTimestamp) override;
bool onTransactionCommitThrowsException = false;
bool transactionCommitted = false;
stdx::function, boost::optional)>
onTransactionCommitFn = [](boost::optional commitOplogEntryOpTime,
boost::optional commitTimestamp) {};
void onTransactionAbort(OperationContext* opCtx,
boost::optional abortOplogEntryOpTime) override;
bool onTransactionAbortThrowsException = false;
bool transactionAborted = false;
stdx::function onTransactionAbortFn = []() {};
};
void OpObserverMock::onTransactionPrepare(OperationContext* opCtx, const OplogSlot& prepareOpTime) {
ASSERT_TRUE(opCtx->lockState()->inAWriteUnitOfWork());
OpObserverNoop::onTransactionPrepare(opCtx, prepareOpTime);
uassert(ErrorCodes::OperationFailed,
"onTransactionPrepare() failed",
!onTransactionPrepareThrowsException);
transactionPrepared = true;
onTransactionPrepareFn();
}
void OpObserverMock::onTransactionCommit(OperationContext* opCtx,
boost::optional commitOplogEntryOpTime,
boost::optional commitTimestamp) {
if (commitOplogEntryOpTime) {
invariant(commitTimestamp);
ASSERT_FALSE(opCtx->lockState()->inAWriteUnitOfWork());
// The 'commitTimestamp' must be cleared before we write the oplog entry.
ASSERT(opCtx->recoveryUnit()->getCommitTimestamp().isNull());
} else {
invariant(!commitTimestamp);
ASSERT(opCtx->lockState()->inAWriteUnitOfWork());
}
OpObserverNoop::onTransactionCommit(opCtx, commitOplogEntryOpTime, commitTimestamp);
uassert(ErrorCodes::OperationFailed,
"onTransactionCommit() failed",
!onTransactionCommitThrowsException);
transactionCommitted = true;
onTransactionCommitFn(commitOplogEntryOpTime, commitTimestamp);
}
void OpObserverMock::onTransactionAbort(OperationContext* opCtx,
boost::optional abortOplogEntryOpTime) {
OpObserverNoop::onTransactionAbort(opCtx, abortOplogEntryOpTime);
uassert(ErrorCodes::OperationFailed,
"onTransactionAbort() failed",
!onTransactionAbortThrowsException);
transactionAborted = true;
onTransactionAbortFn();
}
// When this class is in scope, makes the system behave as if we're in a DBDirectClient
class DirectClientSetter {
public:
explicit DirectClientSetter(OperationContext* opCtx)
: _opCtx(opCtx), _wasInDirectClient(opCtx->getClient()->isInDirectClient()) {
opCtx->getClient()->setInDirectClient(true);
}
~DirectClientSetter() {
_opCtx->getClient()->setInDirectClient(_wasInDirectClient);
}
private:
const OperationContext* _opCtx;
const bool _wasInDirectClient;
};
class TxnParticipantTest : public MockReplCoordServerFixture {
protected:
void setUp() override {
MockReplCoordServerFixture::setUp();
auto service = opCtx()->getServiceContext();
MongoDSessionCatalog::onStepUp(opCtx());
OpObserverRegistry* opObserverRegistry =
dynamic_cast(service->getOpObserver());
auto mockObserver = stdx::make_unique();
_opObserver = mockObserver.get();
opObserverRegistry->addObserver(std::move(mockObserver));
_sessionId = makeLogicalSessionIdForTest();
_txnNumber = 20;
opCtx()->setLogicalSessionId(_sessionId);
opCtx()->setTxnNumber(_txnNumber);
}
void tearDown() override {
// Clear all sessions to free up any stashed resources.
SessionCatalog::get(opCtx()->getServiceContext())->reset_forTest();
MockReplCoordServerFixture::tearDown();
_opObserver = nullptr;
}
SessionCatalog* catalog() {
return SessionCatalog::get(opCtx()->getServiceContext());
}
void runFunctionFromDifferentOpCtx(std::function func) {
// Stash the original client.
auto originalClient = Client::releaseCurrent();
// Create a new client (e.g. for migration) and opCtx.
auto service = opCtx()->getServiceContext();
auto newClientOwned = service->makeClient("newClient");
auto newClient = newClientOwned.get();
Client::setCurrent(std::move(newClientOwned));
auto newOpCtx = newClient->makeOperationContext();
ON_BLOCK_EXIT([&] {
// Restore the original client.
newOpCtx.reset();
Client::releaseCurrent();
Client::setCurrent(std::move(originalClient));
});
// Run the function on bahalf of another operation context.
func(newOpCtx.get());
}
void bumpTxnNumberFromDifferentOpCtx(const LogicalSessionId& sessionId, TxnNumber newTxnNum) {
auto func = [sessionId, newTxnNum](OperationContext* opCtx) {
auto session = SessionCatalog::get(opCtx)->getOrCreateSession(opCtx, sessionId);
auto txnParticipant =
TransactionParticipant::getFromNonCheckedOutSession(session.get());
// Check that there is a transaction in progress with a lower txnNumber.
ASSERT(txnParticipant->inMultiDocumentTransaction());
ASSERT_LT(txnParticipant->getActiveTxnNumber(), newTxnNum);
// Check that the transaction has some operations, so we can ensure they are cleared.
ASSERT_GT(txnParticipant->transactionOperationsForTest().size(), 0u);
// Bump the active transaction number on the txnParticipant. This should clear all state
// from the previous transaction.
txnParticipant->beginOrContinue(newTxnNum, boost::none, boost::none);
ASSERT_EQ(newTxnNum, txnParticipant->getActiveTxnNumber());
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT_EQ(txnParticipant->transactionOperationsForTest().size(), 0u);
};
runFunctionFromDifferentOpCtx(func);
}
OpObserverMock* _opObserver = nullptr;
LogicalSessionId _sessionId;
TxnNumber _txnNumber;
};
// Test that transaction lock acquisition times out in `maxTransactionLockRequestTimeoutMillis`
// milliseconds.
TEST_F(TxnParticipantTest, TransactionThrowsLockTimeoutIfLockIsUnavailable) {
const std::string dbName = "TestDB";
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
{ Lock::DBLock dbXLock(opCtx(), dbName, MODE_X); }
txnParticipant->stashTransactionResources(opCtx());
auto clientWithDatabaseXLock = Client::releaseCurrent();
/**
* Make a new Session, Client, OperationContext and transaction and then attempt to take the
* same database exclusive lock, which should conflict because the other transaction already
* took it.
*/
auto service = opCtx()->getServiceContext();
auto newClientOwned = service->makeClient("newTransactionClient");
auto newClient = newClientOwned.get();
Client::setCurrent(std::move(newClientOwned));
const auto newSessionId = makeLogicalSessionIdForTest();
const TxnNumber newTxnNum = 10;
{
// Limit the scope of the new opCtx to make sure that it gets destroyed before
// new client is destroyed.
auto newOpCtx = newClient->makeOperationContext();
newOpCtx.get()->setLogicalSessionId(newSessionId);
newOpCtx.get()->setTxnNumber(newTxnNum);
OperationContextSessionMongod newOpCtxSession(newOpCtx.get(), true, false, true);
auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get());
newTxnParticipant->unstashTransactionResources(newOpCtx.get(), "insert");
Date_t t1 = Date_t::now();
ASSERT_THROWS_CODE(Lock::DBLock(newOpCtx.get(), dbName, MODE_X),
AssertionException,
ErrorCodes::LockTimeout);
Date_t t2 = Date_t::now();
int defaultMaxTransactionLockRequestTimeoutMillis = 5;
ASSERT_GTE(t2 - t1, Milliseconds(defaultMaxTransactionLockRequestTimeoutMillis));
// A non-conflicting lock acquisition should work just fine.
{ Lock::DBLock tempLock(newOpCtx.get(), "NewTestDB", MODE_X); }
}
// Restore the original client so that teardown works.
Client::releaseCurrent();
Client::setCurrent(std::move(clientWithDatabaseXLock));
}
TEST_F(TxnParticipantTest, StashAndUnstashResources) {
Locker* originalLocker = opCtx()->lockState();
RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
ASSERT(originalLocker);
ASSERT(originalRecoveryUnit);
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
// Perform initial unstash which sets up a WriteUnitOfWork.
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "find");
ASSERT_EQUALS(originalLocker, opCtx()->lockState());
ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
ASSERT(opCtx()->getWriteUnitOfWork());
ASSERT(opCtx()->lockState()->isLocked());
// Stash resources. The original Locker and RecoveryUnit now belong to the stash.
txnParticipant->stashTransactionResources(opCtx());
ASSERT_NOT_EQUALS(originalLocker, opCtx()->lockState());
ASSERT_NOT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
ASSERT(!opCtx()->getWriteUnitOfWork());
// Unset the read concern on the OperationContext. This is needed to unstash.
repl::ReadConcernArgs::get(opCtx()) = repl::ReadConcernArgs();
// Unstash the stashed resources. This restores the original Locker and RecoveryUnit to the
// OperationContext.
txnParticipant->unstashTransactionResources(opCtx(), "find");
ASSERT_EQUALS(originalLocker, opCtx()->lockState());
ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
ASSERT(opCtx()->getWriteUnitOfWork());
// Commit the transaction. This allows us to release locks.
txnParticipant->commitUnpreparedTransaction(opCtx());
}
TEST_F(TxnParticipantTest, CannotSpecifyStartTransactionOnInProgressTxn) {
// Must specify startTransaction=true and autocommit=false to start a transaction.
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
ASSERT_TRUE(txnParticipant->inMultiDocumentTransaction());
// Cannot try to start a transaction that already started.
ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, true),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(TxnParticipantTest, AutocommitRequiredOnEveryTxnOp) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// We must have stashed transaction resources to do a second operation on the transaction.
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
auto txnNum = *opCtx()->getTxnNumber();
// Omitting 'autocommit' after the first statement of a transaction should throw an error.
ASSERT_THROWS_CODE(txnParticipant->beginOrContinue(txnNum, boost::none, boost::none),
AssertionException,
ErrorCodes::InvalidOptions);
// Including autocommit=false should succeed.
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, boost::none);
}
DEATH_TEST_F(TxnParticipantTest, AutocommitCannotBeTrue, "invariant") {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Passing 'autocommit=true' is not allowed and should crash.
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), true, boost::none);
}
DEATH_TEST_F(TxnParticipantTest, StartTransactionCannotBeFalse, "invariant") {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Passing 'startTransaction=false' is not allowed and should crash.
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, false);
}
TEST_F(TxnParticipantTest, SameTransactionPreservesStoredStatements) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// We must have stashed transaction resources to re-open the transaction.
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
ASSERT_BSONOBJ_EQ(operation.toBSON(),
txnParticipant->transactionOperationsForTest()[0].toBSON());
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
// Check the transaction operations before re-opening the transaction.
ASSERT_BSONOBJ_EQ(operation.toBSON(),
txnParticipant->transactionOperationsForTest()[0].toBSON());
// Re-opening the same transaction should have no effect.
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), false, boost::none);
ASSERT_BSONOBJ_EQ(operation.toBSON(),
txnParticipant->transactionOperationsForTest()[0].toBSON());
}
TEST_F(TxnParticipantTest, AbortClearsStoredStatements) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
ASSERT_BSONOBJ_EQ(operation.toBSON(),
txnParticipant->transactionOperationsForTest()[0].toBSON());
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
txnParticipant->abortArbitraryTransaction();
ASSERT_TRUE(txnParticipant->transactionOperationsForTest().empty());
ASSERT_TRUE(txnParticipant->transactionIsAborted());
}
// This test makes sure the commit machinery works even when no operations are done on the
// transaction.
TEST_F(TxnParticipantTest, EmptyTransactionCommit) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
txnParticipant->commitUnpreparedTransaction(opCtx());
txnParticipant->stashTransactionResources(opCtx());
ASSERT_TRUE(txnParticipant->transactionIsCommitted());
}
TEST_F(TxnParticipantTest, CommitTransactionSetsCommitTimestampOnPreparedTransaction) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
const auto userCommitTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
auto originalFn = _opObserver->onTransactionCommitFn;
_opObserver->onTransactionCommitFn = [&](boost::optional commitOplogEntryOpTime,
boost::optional commitTimestamp) {
originalFn(commitOplogEntryOpTime, commitTimestamp);
ASSERT(commitOplogEntryOpTime);
ASSERT(commitTimestamp);
ASSERT_EQ(userCommitTimestamp, commitTimestamp);
};
txnParticipant->commitPreparedTransaction(opCtx(), userCommitTimestamp);
// The recovery unit is reset on commit.
ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
txnParticipant->stashTransactionResources(opCtx());
ASSERT_TRUE(txnParticipant->transactionIsCommitted());
ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
}
TEST_F(TxnParticipantTest, CommitTransactionWithCommitTimestampFailsOnUnpreparedTransaction) {
const auto commitTimestamp = Timestamp(6, 6);
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
ASSERT_THROWS_CODE(txnParticipant->commitPreparedTransaction(opCtx(), commitTimestamp),
AssertionException,
ErrorCodes::InvalidOptions);
}
TEST_F(TxnParticipantTest, CommitTransactionDoesNotSetCommitTimestampOnUnpreparedTransaction) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto originalFn = _opObserver->onTransactionCommitFn;
_opObserver->onTransactionCommitFn = [&](boost::optional commitOplogEntryOpTime,
boost::optional commitTimestamp) {
originalFn(commitOplogEntryOpTime, commitTimestamp);
ASSERT_FALSE(commitOplogEntryOpTime);
ASSERT_FALSE(commitTimestamp);
ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
};
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
txnParticipant->commitUnpreparedTransaction(opCtx());
ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
txnParticipant->stashTransactionResources(opCtx());
ASSERT_TRUE(txnParticipant->transactionIsCommitted());
ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
}
TEST_F(TxnParticipantTest, CommitTransactionWithoutCommitTimestampFailsOnPreparedTransaction) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_THROWS_CODE(txnParticipant->commitUnpreparedTransaction(opCtx()),
AssertionException,
ErrorCodes::InvalidOptions);
}
TEST_F(TxnParticipantTest, CommitTransactionWithNullCommitTimestampFailsOnPreparedTransaction) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_THROWS_CODE(txnParticipant->commitPreparedTransaction(opCtx(), Timestamp()),
AssertionException,
ErrorCodes::InvalidOptions);
}
TEST_F(TxnParticipantTest,
CommitTransactionWithCommitTimestampLessThanPrepareTimestampFailsOnPreparedTransaction) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_THROWS_CODE(txnParticipant->commitPreparedTransaction(
opCtx(), Timestamp(prepareTimestamp.getSecs() - 1, 1)),
AssertionException,
ErrorCodes::InvalidOptions);
}
// This test makes sure the abort machinery works even when no operations are done on the
// transaction.
TEST_F(TxnParticipantTest, EmptyTransactionAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
txnParticipant->abortArbitraryTransaction();
ASSERT_TRUE(txnParticipant->transactionIsAborted());
}
TEST_F(TxnParticipantTest, ConcurrencyOfUnstashAndAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
// An unstash after an abort should uassert.
ASSERT_THROWS_CODE(txnParticipant->unstashTransactionResources(opCtx(), "find"),
AssertionException,
ErrorCodes::NoSuchTransaction);
}
TEST_F(TxnParticipantTest, ConcurrencyOfUnstashAndMigration) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
txnParticipant->stashTransactionResources(opCtx());
// A migration may bump the active transaction number without checking out the
// txnParticipant.
const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
// An unstash after a migration that bumps the active transaction number should uassert.
ASSERT_THROWS_CODE(txnParticipant->unstashTransactionResources(opCtx(), "insert"),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(TxnParticipantTest, ConcurrencyOfStashAndAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "find");
// The transaction may be aborted without checking out the txnParticipant->
txnParticipant->abortArbitraryTransaction();
// A stash after an abort should be a noop.
txnParticipant->stashTransactionResources(opCtx());
}
TEST_F(TxnParticipantTest, ConcurrencyOfStashAndMigration) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
// A migration may bump the active transaction number without checking out the
// txnParticipant.
const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
// A stash after a migration that bumps the active transaction number should uassert.
ASSERT_THROWS_CODE(txnParticipant->stashTransactionResources(opCtx()),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
// An addTransactionOperation() after an abort should uassert.
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
ASSERT_THROWS_CODE(txnParticipant->addTransactionOperation(opCtx(), operation),
AssertionException,
ErrorCodes::NoSuchTransaction);
}
TEST_F(TxnParticipantTest, ConcurrencyOfAddTransactionOperationAndMigration) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "find");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
// A migration may bump the active transaction number without checking out the
// txnParticipant.
const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
// An addTransactionOperation() after a migration that bumps the active transaction number
// should uassert.
ASSERT_THROWS_CODE(txnParticipant->addTransactionOperation(opCtx(), operation),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
// An endTransactionAndRetrieveOperations() after an abort should uassert.
ASSERT_THROWS_CODE(txnParticipant->endTransactionAndRetrieveOperations(opCtx()),
AssertionException,
ErrorCodes::NoSuchTransaction);
}
TEST_F(TxnParticipantTest, ConcurrencyOfEndTransactionAndRetrieveOperationsAndMigration) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
// A migration may bump the active transaction number without checking out the txnParticipant.
const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
// An endTransactionAndRetrieveOperations() after a migration that bumps the active transaction
// number should uassert.
ASSERT_THROWS_CODE(txnParticipant->endTransactionAndRetrieveOperations(opCtx()),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(TxnParticipantTest, ConcurrencyOfCommitUnpreparedTransactionAndAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
// A commitUnpreparedTransaction() after an abort should uassert.
ASSERT_THROWS_CODE(txnParticipant->commitUnpreparedTransaction(opCtx()),
AssertionException,
ErrorCodes::NoSuchTransaction);
}
TEST_F(TxnParticipantTest, ConcurrencyOfCommitPreparedTransactionAndAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
auto prepareTS = txnParticipant->prepareTransaction(opCtx(), {});
txnParticipant->abortArbitraryTransaction();
// A commitPreparedTransaction() after an abort should succeed since the abort should fail.
txnParticipant->commitPreparedTransaction(opCtx(), prepareTS);
ASSERT(_opObserver->transactionCommitted);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT(txnParticipant->transactionIsCommitted());
}
TEST_F(TxnParticipantTest, ConcurrencyOfActiveUnpreparedAbortAndArbitraryAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
ASSERT(txnParticipant->inMultiDocumentTransaction());
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
// The operation throws for some reason and aborts implicitly.
// Abort active transaction after it's been aborted by KillSession is a no-op.
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(txnParticipant->transactionIsAborted());
ASSERT(opCtx()->getWriteUnitOfWork() == nullptr);
}
TEST_F(TxnParticipantTest, ConcurrencyOfActiveUnpreparedAbortAndMigration) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
ASSERT(txnParticipant->inMultiDocumentTransaction());
// A migration may bump the active transaction number without checking out the txnParticipant.
const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
ASSERT_THROWS_CODE(txnParticipant->abortActiveTransaction(opCtx()),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
// The abort fails so the OperationContext state is not cleaned up until the operation is
// complete. The session has already moved on to a new transaction so the transaction will not
// remain active beyond this operation.
ASSERT_FALSE(opCtx()->getWriteUnitOfWork() == nullptr);
}
TEST_F(TxnParticipantTest, ConcurrencyOfActivePreparedAbortAndArbitraryAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
ASSERT(txnParticipant->inMultiDocumentTransaction());
txnParticipant->prepareTransaction(opCtx(), {});
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
// The operation throws for some reason and aborts implicitly.
// Abort active transaction after it's been aborted by KillSession is a no-op.
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(txnParticipant->transactionIsAborted());
ASSERT(opCtx()->getWriteUnitOfWork() == nullptr);
}
TEST_F(TxnParticipantTest, ConcurrencyOfPrepareTransactionAndAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
ASSERT(txnParticipant->transactionIsAborted());
// A prepareTransaction() after an abort should uassert.
ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx(), {}),
AssertionException,
ErrorCodes::NoSuchTransaction);
ASSERT_FALSE(_opObserver->transactionPrepared);
ASSERT(txnParticipant->transactionIsAborted());
}
TEST_F(TxnParticipantTest, KillSessionsDuringPrepareDoesNotAbortTransaction) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
auto ruPrepareTimestamp = Timestamp();
auto originalFn = _opObserver->onTransactionPrepareFn;
_opObserver->onTransactionPrepareFn = [&]() {
originalFn();
ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp();
ASSERT_FALSE(ruPrepareTimestamp.isNull());
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
ASSERT_FALSE(txnParticipant->transactionIsAborted());
};
// Check that prepareTimestamp gets set.
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
// Check that the oldest prepareTimestamp is the one we just set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
ASSERT(_opObserver->transactionPrepared);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
}
DEATH_TEST_F(TxnParticipantTest, AbortDuringPrepareIsFatal, "Invariant") {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
auto originalFn = _opObserver->onTransactionPrepareFn;
_opObserver->onTransactionPrepareFn = [&]() {
originalFn();
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(txnParticipant->transactionIsAborted());
};
txnParticipant->prepareTransaction(opCtx(), {});
}
TEST_F(TxnParticipantTest, ThrowDuringOnTransactionPrepareAbortsTransaction) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
_opObserver->onTransactionPrepareThrowsException = true;
ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx(), {}),
AssertionException,
ErrorCodes::OperationFailed);
ASSERT_FALSE(_opObserver->transactionPrepared);
ASSERT(txnParticipant->transactionIsAborted());
}
TEST_F(TxnParticipantTest, KillSessionsDuringPreparedCommitDoesNotAbortTransaction) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
const auto userCommitTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
auto originalFn = _opObserver->onTransactionCommitFn;
_opObserver->onTransactionCommitFn = [&](boost::optional commitOplogEntryOpTime,
boost::optional commitTimestamp) {
originalFn(commitOplogEntryOpTime, commitTimestamp);
ASSERT(commitOplogEntryOpTime);
ASSERT(commitTimestamp);
ASSERT_EQ(*commitTimestamp, userCommitTimestamp);
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
ASSERT_FALSE(txnParticipant->transactionIsAborted());
};
txnParticipant->commitPreparedTransaction(opCtx(), userCommitTimestamp);
// The recovery unit is reset on commit.
ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
ASSERT(_opObserver->transactionCommitted);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT(txnParticipant->transactionIsCommitted());
}
TEST_F(TxnParticipantTest, ArbitraryAbortDuringPreparedCommitDoesNotAbortTransaction) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
const auto userCommitTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
auto originalFn = _opObserver->onTransactionCommitFn;
_opObserver->onTransactionCommitFn = [&](boost::optional commitOplogEntryOpTime,
boost::optional commitTimestamp) {
originalFn(commitOplogEntryOpTime, commitTimestamp);
ASSERT(commitOplogEntryOpTime);
ASSERT(commitTimestamp);
ASSERT_EQ(*commitTimestamp, userCommitTimestamp);
// The transaction may be aborted without checking out the txnParticipant.
auto func = [&](OperationContext* opCtx) { txnParticipant->abortArbitraryTransaction(); };
runFunctionFromDifferentOpCtx(func);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
};
txnParticipant->commitPreparedTransaction(opCtx(), userCommitTimestamp);
// The recovery unit is reset on commit.
ASSERT(opCtx()->recoveryUnit()->getCommitTimestamp().isNull());
ASSERT(_opObserver->transactionCommitted);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT(txnParticipant->transactionIsCommitted());
}
DEATH_TEST_F(TxnParticipantTest,
ThrowDuringPreparedOnTransactionCommitIsFatal,
"Caught exception during commit") {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
_opObserver->onTransactionCommitThrowsException = true;
const auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
txnParticipant->commitPreparedTransaction(opCtx(), prepareTimestamp);
}
TEST_F(TxnParticipantTest, ThrowDuringUnpreparedCommitLetsTheAbortAtEntryPointToCleanUp) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
_opObserver->onTransactionCommitThrowsException = true;
ASSERT_THROWS_CODE(txnParticipant->commitUnpreparedTransaction(opCtx()),
AssertionException,
ErrorCodes::OperationFailed);
ASSERT_FALSE(_opObserver->transactionCommitted);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT_FALSE(txnParticipant->transactionIsCommitted());
// Simulate the abort at entry point.
txnParticipant->abortActiveUnpreparedOrStashPreparedTransaction(opCtx());
ASSERT_TRUE(txnParticipant->transactionIsAborted());
}
TEST_F(TxnParticipantTest, ConcurrencyOfCommitUnpreparedTransactionAndMigration) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
// A migration may bump the active transaction number without checking out the txnParticipant.
const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
// A commitUnpreparedTransaction() after a migration that bumps the active transaction number
// should uassert.
ASSERT_THROWS_CODE(txnParticipant->commitUnpreparedTransaction(opCtx()),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
}
TEST_F(TxnParticipantTest, ConcurrencyOfPrepareTransactionAndMigration) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
// A migration may bump the active transaction number without checking out the txnParticipant.
const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum);
// A prepareTransaction() after a migration that bumps the active transaction number should
// uassert.
ASSERT_THROWS_CODE(txnParticipant->prepareTransaction(opCtx(), {}),
AssertionException,
ErrorCodes::ConflictingOperationInProgress);
ASSERT_FALSE(_opObserver->transactionPrepared);
}
TEST_F(TxnParticipantTest, ContinuingATransactionWithNoResourcesAborts) {
OperationContextSessionMongod(opCtx(), true, false, true);
ASSERT_THROWS_CODE(OperationContextSessionMongod(opCtx(), true, false, boost::none),
AssertionException,
ErrorCodes::NoSuchTransaction);
}
TEST_F(TxnParticipantTest, KillSessionsDoesNotAbortPreparedTransactions) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto ruPrepareTimestamp = Timestamp();
auto originalFn = _opObserver->onTransactionPrepareFn;
_opObserver->onTransactionPrepareFn = [&]() {
originalFn();
ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp();
ASSERT_FALSE(ruPrepareTimestamp.isNull());
};
// Check that prepareTimestamp gets set.
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
// Check that the oldest prepareTimestamp is the one we just set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
txnParticipant->abortArbitraryTransaction();
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT(_opObserver->transactionPrepared);
}
TEST_F(TxnParticipantTest, TransactionTimeoutDoesNotAbortPreparedTransactions) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto ruPrepareTimestamp = Timestamp();
auto originalFn = _opObserver->onTransactionPrepareFn;
_opObserver->onTransactionPrepareFn = [&]() {
originalFn();
ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp();
ASSERT_FALSE(ruPrepareTimestamp.isNull());
};
// Check that prepareTimestamp gets set.
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
// Check that the oldest prepareTimestamp is the one we just set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
txnParticipant->abortArbitraryTransactionIfExpired();
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT(_opObserver->transactionPrepared);
}
TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInProgress) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto ruPrepareTimestamp = Timestamp();
auto originalFn = _opObserver->onTransactionPrepareFn;
_opObserver->onTransactionPrepareFn = [&]() {
originalFn();
ruPrepareTimestamp = opCtx()->recoveryUnit()->getPrepareTimestamp();
ASSERT_FALSE(ruPrepareTimestamp.isNull());
};
// Check that prepareTimestamp gets set.
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
// Check that the oldest prepareTimestamp is the one we just set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
{
// Try to start a new transaction while there is already a prepared transaction on the
// session. This should fail with a PreparedTransactionInProgress error.
auto func = [&](OperationContext* newOpCtx) {
auto session = SessionCatalog::get(newOpCtx)->getOrCreateSession(
newOpCtx, *opCtx()->getLogicalSessionId());
auto txnParticipant =
TransactionParticipant::getFromNonCheckedOutSession(session.get());
ASSERT_THROWS_CODE(
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber() + 1, false, true),
AssertionException,
ErrorCodes::PreparedTransactionInProgress);
};
runFunctionFromDifferentOpCtx(func);
}
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT(_opObserver->transactionPrepared);
}
TEST_F(TxnParticipantTest, CannotInsertInPreparedTransaction) {
OperationContextSessionMongod outerScopedSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_THROWS_CODE(txnParticipant->unstashTransactionResources(opCtx(), "insert"),
AssertionException,
ErrorCodes::PreparedTransactionInProgress);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT(_opObserver->transactionPrepared);
}
TEST_F(TxnParticipantTest, MigrationThrowsOnPreparedTransaction) {
OperationContextSessionMongod outerScopedSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
txnParticipant->prepareTransaction(opCtx(), {});
// A migration may bump the active transaction number without checking out the session.
auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
ASSERT_THROWS_CODE(
bumpTxnNumberFromDifferentOpCtx(*opCtx()->getLogicalSessionId(), higherTxnNum),
AssertionException,
ErrorCodes::PreparedTransactionInProgress);
// The transaction is not affected.
ASSERT_TRUE(_opObserver->transactionPrepared);
}
TEST_F(TxnParticipantTest, ImplictAbortDoesNotAbortPreparedTransaction) {
OperationContextSessionMongod outerScopedSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
txnParticipant->prepareTransaction(opCtx(), {});
// The next command throws an exception and wants to abort the transaction.
// This is a no-op.
txnParticipant->abortActiveUnpreparedOrStashPreparedTransaction(opCtx());
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT_TRUE(_opObserver->transactionPrepared);
}
DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransaction, "invariant") {
OperationContextSessionMongod outerScopedSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
// Check that the oldest prepareTimestamp is the one we just set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
auto sessionId = *opCtx()->getLogicalSessionId();
auto txnNum = *opCtx()->getTxnNumber();
_opObserver->onTransactionCommitFn = [&](boost::optional commitOplogEntryOpTime,
boost::optional commitTimestamp) {
// This should never happen.
auto func = [&](OperationContext* opCtx) {
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNum);
// Hit an invariant. This should never happen.
txnParticipant->abortActiveTransaction(opCtx);
};
runFunctionFromDifferentOpCtx(func);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
};
txnParticipant->commitPreparedTransaction(opCtx(), prepareTimestamp);
// Check that we removed the prepareTimestamp from the set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none);
}
TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) {
ASSERT_THROWS_CODE(OperationContextSessionMongod(opCtx(), true, false, boost::none),
AssertionException,
ErrorCodes::NoSuchTransaction);
}
// Tests that a transaction aborts if it becomes too large before trying to commit it.
TEST_F(TxnParticipantTest, TransactionTooLargeWhileBuilding) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// Two 6MB operations should succeed; three 6MB operations should fail.
constexpr size_t kBigDataSize = 6 * 1024 * 1024;
std::unique_ptr bigData(new uint8_t[kBigDataSize]());
auto operation = repl::OplogEntry::makeInsertOperation(
kNss,
kUUID,
BSON("_id" << 0 << "data" << BSONBinData(bigData.get(), kBigDataSize, BinDataGeneral)));
txnParticipant->addTransactionOperation(opCtx(), operation);
txnParticipant->addTransactionOperation(opCtx(), operation);
ASSERT_THROWS_CODE(txnParticipant->addTransactionOperation(opCtx(), operation),
AssertionException,
ErrorCodes::TransactionTooLarge);
}
TEST_F(TxnParticipantTest, StashInNestedSessionIsANoop) {
OperationContextSessionMongod outerScopedSession(opCtx(), true, false, true);
Locker* originalLocker = opCtx()->lockState();
RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
ASSERT(originalLocker);
ASSERT(originalRecoveryUnit);
// Set the readConcern on the OperationContext.
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
// Perform initial unstash, which sets up a WriteUnitOfWork.
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "find");
ASSERT_EQUALS(originalLocker, opCtx()->lockState());
ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
ASSERT(opCtx()->getWriteUnitOfWork());
{
// Make it look like we're in a DBDirectClient running a nested operation.
DirectClientSetter inDirectClient(opCtx());
OperationContextSessionMongod innerScopedSession(opCtx(), true, boost::none, boost::none);
txnParticipant->stashTransactionResources(opCtx());
// The stash was a noop, so the locker, RecoveryUnit, and WriteUnitOfWork on the
// OperationContext are unaffected.
ASSERT_EQUALS(originalLocker, opCtx()->lockState());
ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
ASSERT(opCtx()->getWriteUnitOfWork());
}
}
TEST_F(TxnParticipantTest, UnstashInNestedSessionIsANoop) {
OperationContextSessionMongod outerScopedSession(opCtx(), true, false, true);
Locker* originalLocker = opCtx()->lockState();
RecoveryUnit* originalRecoveryUnit = opCtx()->recoveryUnit();
ASSERT(originalLocker);
ASSERT(originalRecoveryUnit);
// Set the readConcern on the OperationContext.
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
{
// Make it look like we're in a DBDirectClient running a nested operation.
DirectClientSetter inDirectClient(opCtx());
OperationContextSessionMongod innerScopedSession(opCtx(), true, boost::none, boost::none);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "find");
// The unstash was a noop, so the OperationContext did not get a WriteUnitOfWork.
ASSERT_EQUALS(originalLocker, opCtx()->lockState());
ASSERT_EQUALS(originalRecoveryUnit, opCtx()->recoveryUnit());
ASSERT_FALSE(opCtx()->getWriteUnitOfWork());
}
}
/**
* Test fixture for testing behavior that depends on a server's cluster role.
*
* Each test case relies on the txnNumber on the operation context, which cannot be changed, so
* define tests for behavior shared by config and shard servers as methods here and call them in the
* fixtures for config and shard servers defined below.
*/
class ShardedClusterParticipantTest : public TxnParticipantTest {
protected:
void canSpecifyStartTransactionOnInProgressTxn() {
auto autocommit = false;
auto startTransaction = true;
OperationContextSessionMongod opCtxSession(
opCtx(), true /* shouldCheckOutSession */, autocommit, startTransaction);
auto txnParticipant = TransactionParticipant::get(opCtx());
ASSERT(txnParticipant->inMultiDocumentTransaction());
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), autocommit, startTransaction);
ASSERT(txnParticipant->inMultiDocumentTransaction());
}
void canSpecifyStartTransactionOnAbortedTxn() {
auto autocommit = false;
auto startTransaction = true;
OperationContextSessionMongod opCtxSession(
opCtx(), true /* shouldCheckOutSession */, autocommit, startTransaction);
auto txnParticipant = TransactionParticipant::get(opCtx());
ASSERT(txnParticipant->inMultiDocumentTransaction());
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(txnParticipant->transactionIsAborted());
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), autocommit, startTransaction);
ASSERT(txnParticipant->inMultiDocumentTransaction());
}
void cannotSpecifyStartTransactionOnCommittedTxn() {
auto autocommit = false;
auto startTransaction = true;
OperationContextSessionMongod opCtxSession(
opCtx(), true /* shouldCheckOutSession */, autocommit, startTransaction);
auto txnParticipant = TransactionParticipant::get(opCtx());
ASSERT(txnParticipant->inMultiDocumentTransaction());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
txnParticipant->commitUnpreparedTransaction(opCtx());
ASSERT_THROWS_CODE(
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), autocommit, startTransaction),
AssertionException,
50911);
}
void cannotSpecifyStartTransactionOnPreparedTxn() {
auto autocommit = false;
auto startTransaction = true;
OperationContextSessionMongod opCtxSession(
opCtx(), true /* shouldCheckOutSession */, autocommit, startTransaction);
auto txnParticipant = TransactionParticipant::get(opCtx());
ASSERT(txnParticipant->inMultiDocumentTransaction());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
auto operation = repl::OplogEntry::makeInsertOperation(kNss, kUUID, BSON("TestValue" << 0));
txnParticipant->addTransactionOperation(opCtx(), operation);
txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_THROWS_CODE(
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), autocommit, startTransaction),
AssertionException,
50911);
}
void cannotSpecifyStartTransactionOnStartedRetryableWrite() {
boost::optional autocommit = boost::none;
boost::optional startTransaction = boost::none;
OperationContextSessionMongod opCtxSession(
opCtx(), true /* shouldCheckOutSession */, autocommit, startTransaction);
auto txnParticipant = TransactionParticipant::get(opCtx());
ASSERT_FALSE(txnParticipant->inMultiDocumentTransaction());
autocommit = false;
startTransaction = true;
ASSERT_THROWS_CODE(
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), autocommit, startTransaction),
AssertionException,
50911);
}
// TODO SERVER-36639: Add tests that the active transaction number cannot be reused if the
// transaction is in the abort after prepare state (or any state indicating the participant
// has been involved in a two phase commit).
};
/**
* Test fixture for a transaction participant running on a shard server.
*/
class ShardTxnParticipantTest : public ShardedClusterParticipantTest {
protected:
void setUp() final {
TxnParticipantTest::setUp();
serverGlobalParams.clusterRole = ClusterRole::ShardServer;
}
void tearDown() final {
serverGlobalParams.clusterRole = ClusterRole::None;
TxnParticipantTest::tearDown();
}
};
TEST_F(ShardTxnParticipantTest, CanSpecifyStartTransactionOnInProgressTxn) {
canSpecifyStartTransactionOnInProgressTxn();
}
TEST_F(ShardTxnParticipantTest, CanSpecifyStartTransactionOnAbortedTxn) {
canSpecifyStartTransactionOnAbortedTxn();
}
TEST_F(ShardTxnParticipantTest, CannotSpecifyStartTransactionOnCommittedTxn) {
cannotSpecifyStartTransactionOnCommittedTxn();
}
TEST_F(ShardTxnParticipantTest, CannotSpecifyStartTransactionOnPreparedTxn) {
cannotSpecifyStartTransactionOnPreparedTxn();
}
TEST_F(ShardTxnParticipantTest, CannotSpecifyStartTransactionOnStartedRetryableWrite) {
cannotSpecifyStartTransactionOnStartedRetryableWrite();
}
/**
* Test fixture for a transaction participant running on a config server.
*/
class ConfigTxnParticipantTest : public ShardedClusterParticipantTest {
protected:
void setUp() final {
TxnParticipantTest::setUp();
serverGlobalParams.clusterRole = ClusterRole::ConfigServer;
}
void tearDown() final {
serverGlobalParams.clusterRole = ClusterRole::None;
TxnParticipantTest::tearDown();
}
};
TEST_F(ConfigTxnParticipantTest, CanSpecifyStartTransactionOnInProgressTxn) {
canSpecifyStartTransactionOnInProgressTxn();
}
TEST_F(ConfigTxnParticipantTest, CanSpecifyStartTransactionOnAbortedTxn) {
canSpecifyStartTransactionOnAbortedTxn();
}
TEST_F(ConfigTxnParticipantTest, CannotSpecifyStartTransactionOnCommittedTxn) {
cannotSpecifyStartTransactionOnCommittedTxn();
}
TEST_F(ConfigTxnParticipantTest, CannotSpecifyStartTransactionOnPreparedTxn) {
cannotSpecifyStartTransactionOnPreparedTxn();
}
TEST_F(ConfigTxnParticipantTest, CannotSpecifyStartTransactionOnStartedRetryableWrite) {
cannotSpecifyStartTransactionOnStartedRetryableWrite();
}
TEST_F(TxnParticipantTest, KillSessionsDuringUnpreparedAbortSucceeds) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
auto originalFn = _opObserver->onTransactionAbortFn;
_opObserver->onTransactionAbortFn = [&] {
originalFn();
// The transaction may be aborted without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
ASSERT(txnParticipant->transactionIsAborted());
};
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(_opObserver->transactionAborted);
ASSERT(txnParticipant->transactionIsAborted());
}
TEST_F(TxnParticipantTest, ActiveAbortIsLegalDuringUnpreparedAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
auto sessionId = *opCtx()->getLogicalSessionId();
auto txnNumber = *opCtx()->getTxnNumber();
auto originalFn = _opObserver->onTransactionAbortFn;
_opObserver->onTransactionAbortFn = [&] {
originalFn();
auto func = [&](OperationContext* opCtx) {
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNumber);
// Prevent recursion.
_opObserver->onTransactionAbortFn = originalFn;
txnParticipant->abortActiveTransaction(opCtx);
ASSERT(txnParticipant->transactionIsAborted());
};
runFunctionFromDifferentOpCtx(func);
};
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(_opObserver->transactionAborted);
ASSERT(txnParticipant->transactionIsAborted());
}
TEST_F(TxnParticipantTest, ThrowDuringUnpreparedOnTransactionAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
_opObserver->onTransactionAbortThrowsException = true;
ASSERT_THROWS_CODE(txnParticipant->abortActiveTransaction(opCtx()),
AssertionException,
ErrorCodes::OperationFailed);
}
TEST_F(TxnParticipantTest, KillSessionsDuringPreparedAbortFails) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
txnParticipant->prepareTransaction(opCtx(), {});
auto originalFn = _opObserver->onTransactionAbortFn;
_opObserver->onTransactionAbortFn = [&] {
originalFn();
// KillSessions may attempt to abort without checking out the txnParticipant.
txnParticipant->abortArbitraryTransaction();
ASSERT_FALSE(txnParticipant->transactionIsAborted());
ASSERT(txnParticipant->transactionIsPrepared());
};
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(_opObserver->transactionAborted);
ASSERT(txnParticipant->transactionIsAborted());
}
TEST_F(TxnParticipantTest, ActiveAbortSucceedsDuringPreparedAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
txnParticipant->prepareTransaction(opCtx(), {});
auto sessionId = *opCtx()->getLogicalSessionId();
auto txnNumber = *opCtx()->getTxnNumber();
auto originalFn = _opObserver->onTransactionAbortFn;
_opObserver->onTransactionAbortFn = [&] {
originalFn();
auto func = [&](OperationContext* opCtx) {
opCtx->setLogicalSessionId(sessionId);
opCtx->setTxnNumber(txnNumber);
// Prevent recursion.
_opObserver->onTransactionAbortFn = originalFn;
txnParticipant->abortActiveTransaction(opCtx);
ASSERT(txnParticipant->transactionIsAborted());
};
runFunctionFromDifferentOpCtx(func);
};
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(_opObserver->transactionAborted);
ASSERT(txnParticipant->transactionIsAborted());
}
TEST_F(TxnParticipantTest, ThrowDuringPreparedOnTransactionAbortIsFatal) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
txnParticipant->prepareTransaction(opCtx(), {});
_opObserver->onTransactionAbortThrowsException = true;
ASSERT_THROWS_CODE(txnParticipant->abortActiveTransaction(opCtx()),
AssertionException,
ErrorCodes::OperationFailed);
}
/**
* Test fixture for transactions metrics.
*/
class TransactionsMetricsTest : public TxnParticipantTest {
public:
using TickSourceMicrosecondMock = TickSourceMock;
/**
* Set up and return a mock clock source.
*/
ClockSourceMock* initMockPreciseClockSource() {
getServiceContext()->setPreciseClockSource(stdx::make_unique());
return dynamic_cast(getServiceContext()->getPreciseClockSource());
}
/**
* Set up and return a mock tick source.
*/
TickSourceMicrosecondMock* initMockTickSource() {
getServiceContext()->setTickSource(stdx::make_unique());
auto tickSource =
dynamic_cast(getServiceContext()->getTickSource());
// Ensure that the tick source is not initialized to zero.
tickSource->reset(1);
return tickSource;
}
};
TEST_F(TransactionsMetricsTest, IncrementTotalStartedUponStartTransaction) {
unsigned long long beforeTransactionStart =
ServerTransactionsMetrics::get(opCtx())->getTotalStarted();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
// Tests that the total transactions started counter is incremented by 1 when a new transaction
// is started.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalStarted(),
beforeTransactionStart + 1U);
}
TEST_F(TransactionsMetricsTest, IncrementTotalCommittedOnCommit) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
unsigned long long beforeCommitCount =
ServerTransactionsMetrics::get(opCtx())->getTotalCommitted();
txnParticipant->commitUnpreparedTransaction(opCtx());
// Assert that the committed counter is incremented by 1.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalCommitted(), beforeCommitCount + 1U);
}
TEST_F(TransactionsMetricsTest, IncrementTotalAbortedUponAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
unsigned long long beforeAbortCount =
ServerTransactionsMetrics::get(opCtx())->getTotalAborted();
txnParticipant->abortArbitraryTransaction();
// Assert that the aborted counter is incremented by 1.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalAborted(), beforeAbortCount + 1U);
}
TEST_F(TransactionsMetricsTest, TrackTotalOpenTransactionsWithAbort) {
unsigned long long beforeTransactionStart =
ServerTransactionsMetrics::get(opCtx())->getCurrentOpen();
// Tests that starting a transaction increments the open transactions counter by 1.
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentOpen(),
beforeTransactionStart + 1U);
// Tests that stashing the transaction resources does not affect the open transactions counter.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentOpen(),
beforeTransactionStart + 1U);
// Tests that aborting a transaction decrements the open transactions counter by 1.
txnParticipant->abortArbitraryTransaction();
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentOpen(), beforeTransactionStart);
}
TEST_F(TransactionsMetricsTest, TrackTotalOpenTransactionsWithCommit) {
unsigned long long beforeTransactionStart =
ServerTransactionsMetrics::get(opCtx())->getCurrentOpen();
// Tests that starting a transaction increments the open transactions counter by 1.
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentOpen(),
beforeTransactionStart + 1U);
// Tests that stashing the transaction resources does not affect the open transactions counter.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentOpen(),
beforeTransactionStart + 1U);
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// Tests that committing a transaction decrements the open transactions counter by 1.
txnParticipant->commitUnpreparedTransaction(opCtx());
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentOpen(), beforeTransactionStart);
}
TEST_F(TransactionsMetricsTest, TrackTotalActiveAndInactiveTransactionsWithCommit) {
unsigned long long beforeActiveCounter =
ServerTransactionsMetrics::get(opCtx())->getCurrentActive();
unsigned long long beforeInactiveCounter =
ServerTransactionsMetrics::get(opCtx())->getCurrentInactive();
// Starting the transaction should put it into an inactive state.
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(),
beforeInactiveCounter + 1);
// Tests that the first unstash increments the active counter and decrements the inactive
// counter.
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(),
beforeActiveCounter + 1U);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter);
// Tests that stashing the transaction resources decrements active counter and increments
// inactive counter.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActiveCounter);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(),
beforeInactiveCounter + 1U);
// Tests that the second unstash increments the active counter and decrements the inactive
// counter.
txnParticipant->unstashTransactionResources(opCtx(), "insert");
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(),
beforeActiveCounter + 1U);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter);
// Tests that committing a transaction decrements the active counter only.
txnParticipant->commitUnpreparedTransaction(opCtx());
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActiveCounter);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter);
}
TEST_F(TransactionsMetricsTest, TrackTotalActiveAndInactiveTransactionsWithStashedAbort) {
unsigned long long beforeActiveCounter =
ServerTransactionsMetrics::get(opCtx())->getCurrentActive();
unsigned long long beforeInactiveCounter =
ServerTransactionsMetrics::get(opCtx())->getCurrentInactive();
// Starting the transaction should put it into an inactive state.
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(),
beforeInactiveCounter + 1);
// Tests that the first unstash increments the active counter and decrements the inactive
// counter.
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(),
beforeActiveCounter + 1U);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter);
// Tests that stashing the transaction resources decrements active counter and increments
// inactive counter.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActiveCounter);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(),
beforeInactiveCounter + 1U);
// Tests that aborting a stashed transaction decrements the inactive counter only.
txnParticipant->abortArbitraryTransaction();
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActiveCounter);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter);
}
TEST_F(TransactionsMetricsTest, TrackTotalActiveAndInactiveTransactionsWithUnstashedAbort) {
unsigned long long beforeActiveCounter =
ServerTransactionsMetrics::get(opCtx())->getCurrentActive();
unsigned long long beforeInactiveCounter =
ServerTransactionsMetrics::get(opCtx())->getCurrentInactive();
// Starting the transaction should put it into an inactive state.
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(),
beforeInactiveCounter + 1);
// Tests that the first unstash increments the active counter and decrements the inactive
// counter.
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(),
beforeActiveCounter + 1U);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter);
// Tests that aborting a stashed transaction decrements the active counter only.
txnParticipant->abortArbitraryTransaction();
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentActive(), beforeActiveCounter);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getCurrentInactive(), beforeInactiveCounter);
}
TEST_F(TransactionsMetricsTest, SingleTransactionStatsDurationShouldBeSetUponCommit) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
// Advance the clock.
tickSource->advance(Microseconds(100));
txnParticipant->commitUnpreparedTransaction(opCtx());
ASSERT_EQ(
txnParticipant->getSingleTransactionStats().getDuration(tickSource, tickSource->getTicks()),
Microseconds(100));
}
TEST_F(TransactionsMetricsTest, SingleTransactionStatsDurationShouldBeSetUponAbort) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// Advance the clock.
tickSource->advance(Microseconds(100));
txnParticipant->abortArbitraryTransaction();
ASSERT_EQ(
txnParticipant->getSingleTransactionStats().getDuration(tickSource, tickSource->getTicks()),
Microseconds(100));
}
TEST_F(TransactionsMetricsTest, SingleTransactionStatsDurationShouldKeepIncreasingUntilCommit) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
tickSource->advance(Microseconds(100));
// The transaction's duration should have increased.
ASSERT_EQ(
txnParticipant->getSingleTransactionStats().getDuration(tickSource, tickSource->getTicks()),
Microseconds(100));
tickSource->advance(Microseconds(100));
// Commit the transaction and check duration.
txnParticipant->commitUnpreparedTransaction(opCtx());
ASSERT_EQ(
txnParticipant->getSingleTransactionStats().getDuration(tickSource, tickSource->getTicks()),
Microseconds(200));
// The transaction committed, so the duration shouldn't have increased even if more time passed.
tickSource->advance(Microseconds(100));
ASSERT_EQ(
txnParticipant->getSingleTransactionStats().getDuration(tickSource, tickSource->getTicks()),
Microseconds(200));
}
TEST_F(TransactionsMetricsTest, SingleTransactionStatsDurationShouldKeepIncreasingUntilAbort) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
tickSource->advance(Microseconds(100));
// The transaction's duration should have increased.
ASSERT_EQ(
txnParticipant->getSingleTransactionStats().getDuration(tickSource, tickSource->getTicks()),
Microseconds(100));
tickSource->advance(Microseconds(100));
// Abort the transaction and check duration.
txnParticipant->abortArbitraryTransaction();
ASSERT_EQ(
txnParticipant->getSingleTransactionStats().getDuration(tickSource, tickSource->getTicks()),
Microseconds(200));
// The transaction aborted, so the duration shouldn't have increased even if more time passed.
tickSource->advance(Microseconds(100));
ASSERT_EQ(
txnParticipant->getSingleTransactionStats().getDuration(tickSource, tickSource->getTicks()),
Microseconds(200));
}
TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldBeSetUponUnstashAndStash) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Time active should be zero.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{0});
txnParticipant->unstashTransactionResources(opCtx(), "insert");
tickSource->advance(Microseconds(100));
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
// Advance clock during inactive period.
tickSource->advance(Microseconds(100));
// Time active should have increased only during active period.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
txnParticipant->unstashTransactionResources(opCtx(), "insert");
tickSource->advance(Microseconds(100));
txnParticipant->stashTransactionResources(opCtx());
// Advance clock during inactive period.
tickSource->advance(Microseconds(100));
// Time active should have increased again.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{200});
// Start a new transaction.
const auto higherTxnNum = *opCtx()->getTxnNumber() + 1;
txnParticipant->beginOrContinue(higherTxnNum, false, true);
// Time active should be zero for a new transaction.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{0});
}
TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldBeSetUponUnstashAndAbort) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Time active should be zero.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{0});
txnParticipant->unstashTransactionResources(opCtx(), "insert");
tickSource->advance(Microseconds(100));
txnParticipant->abortArbitraryTransaction();
// Time active should have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
tickSource->advance(Microseconds(100));
// The transaction is not active after abort, so time active should not have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
}
TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldNotBeSetUponAbortOnly) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Time active should be zero.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{0});
// Advance clock during inactive period.
tickSource->advance(Microseconds(100));
txnParticipant->abortArbitraryTransaction();
// Time active should still be zero.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{0});
}
TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldIncreaseUntilStash) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Time active should be zero.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{0});
txnParticipant->unstashTransactionResources(opCtx(), "insert");
tickSource->advance(Microseconds(100));
// Time active should have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds(100));
tickSource->advance(Microseconds(100));
// Time active should have increased again.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds(200));
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
tickSource->advance(Microseconds(100));
// The transaction is no longer active, so time active should have stopped increasing.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds(200));
}
TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldIncreaseUntilCommit) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Time active should be zero.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{0});
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
tickSource->advance(Microseconds(100));
// Time active should have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
tickSource->advance(Microseconds(100));
// Time active should have increased again.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{200});
txnParticipant->commitUnpreparedTransaction(opCtx());
tickSource->advance(Microseconds(100));
// The transaction is no longer active, so time active should have stopped increasing.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds(200));
}
TEST_F(TransactionsMetricsTest, TimeActiveMicrosShouldNotBeSetIfUnstashHasBadReadConcernArgs) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Initialize bad read concern args (!readConcernArgs.isEmpty()).
repl::ReadConcernArgs readConcernArgs(repl::ReadConcernLevel::kLocalReadConcern);
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
// Transaction resources do not exist yet.
txnParticipant->unstashTransactionResources(opCtx(), "find");
tickSource->advance(Microseconds(100));
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
// Time active should have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
// Transaction resources already exist here and should throw an exception due to bad read
// concern arguments.
ASSERT_THROWS_CODE(txnParticipant->unstashTransactionResources(opCtx(), "find"),
AssertionException,
ErrorCodes::InvalidOptions);
tickSource->advance(Microseconds(100));
// Time active should not have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeActiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
}
TEST_F(TransactionsMetricsTest, AdditiveMetricsObjectsShouldBeAddedTogetherUponStash) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Initialize field values for both AdditiveMetrics objects.
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.keysExamined = 1;
CurOp::get(opCtx())->debug().additiveMetrics.keysExamined = 5;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.docsExamined = 2;
CurOp::get(opCtx())->debug().additiveMetrics.docsExamined = 0;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.nMatched = 3;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.nModified = 1;
CurOp::get(opCtx())->debug().additiveMetrics.nModified = 1;
CurOp::get(opCtx())->debug().additiveMetrics.ninserted = 4;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.nmoved = 3;
CurOp::get(opCtx())->debug().additiveMetrics.nmoved = 2;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.keysInserted = 1;
CurOp::get(opCtx())->debug().additiveMetrics.keysInserted = 1;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.keysDeleted = 0;
CurOp::get(opCtx())->debug().additiveMetrics.keysDeleted = 0;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.prepareReadConflicts =
5;
CurOp::get(opCtx())->debug().additiveMetrics.prepareReadConflicts = 4;
auto additiveMetricsToCompare =
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics;
additiveMetricsToCompare.add(CurOp::get(opCtx())->debug().additiveMetrics);
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
ASSERT(txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.equals(
additiveMetricsToCompare));
}
TEST_F(TransactionsMetricsTest, AdditiveMetricsObjectsShouldBeAddedTogetherUponCommit) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Initialize field values for both AdditiveMetrics objects.
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.keysExamined = 3;
CurOp::get(opCtx())->debug().additiveMetrics.keysExamined = 2;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.docsExamined = 0;
CurOp::get(opCtx())->debug().additiveMetrics.docsExamined = 2;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.nMatched = 4;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.nModified = 5;
CurOp::get(opCtx())->debug().additiveMetrics.nModified = 1;
CurOp::get(opCtx())->debug().additiveMetrics.ninserted = 1;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.ndeleted = 4;
CurOp::get(opCtx())->debug().additiveMetrics.ndeleted = 0;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.keysInserted = 1;
CurOp::get(opCtx())->debug().additiveMetrics.keysInserted = 1;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.prepareReadConflicts =
0;
CurOp::get(opCtx())->debug().additiveMetrics.prepareReadConflicts = 0;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.writeConflicts = 6;
CurOp::get(opCtx())->debug().additiveMetrics.writeConflicts = 3;
auto additiveMetricsToCompare =
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics;
additiveMetricsToCompare.add(CurOp::get(opCtx())->debug().additiveMetrics);
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->commitUnpreparedTransaction(opCtx());
ASSERT(txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.equals(
additiveMetricsToCompare));
}
TEST_F(TransactionsMetricsTest, AdditiveMetricsObjectsShouldBeAddedTogetherUponAbort) {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Initialize field values for both AdditiveMetrics objects.
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.keysExamined = 2;
CurOp::get(opCtx())->debug().additiveMetrics.keysExamined = 4;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.docsExamined = 1;
CurOp::get(opCtx())->debug().additiveMetrics.docsExamined = 3;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.nMatched = 2;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.nModified = 0;
CurOp::get(opCtx())->debug().additiveMetrics.nModified = 3;
CurOp::get(opCtx())->debug().additiveMetrics.ndeleted = 5;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.nmoved = 0;
CurOp::get(opCtx())->debug().additiveMetrics.nmoved = 2;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.keysInserted = 1;
CurOp::get(opCtx())->debug().additiveMetrics.keysInserted = 1;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.keysDeleted = 6;
CurOp::get(opCtx())->debug().additiveMetrics.keysDeleted = 0;
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.writeConflicts = 3;
CurOp::get(opCtx())->debug().additiveMetrics.writeConflicts = 3;
auto additiveMetricsToCompare =
txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics;
additiveMetricsToCompare.add(CurOp::get(opCtx())->debug().additiveMetrics);
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(txnParticipant->getSingleTransactionStats().getOpDebug()->additiveMetrics.equals(
additiveMetricsToCompare));
}
TEST_F(TransactionsMetricsTest, TimeInactiveMicrosShouldBeSetUponUnstashAndStash) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Time inactive should have increased.
tickSource->advance(Microseconds(100));
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
// Time inactive should have increased again.
tickSource->advance(Microseconds(100));
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{200});
txnParticipant->unstashTransactionResources(opCtx(), "insert");
tickSource->advance(Microseconds(100));
// The transaction is currently active, so time inactive should not have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{200});
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
tickSource->advance(Microseconds(100));
// The transaction is inactive again, so time inactive should have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{300});
}
TEST_F(TransactionsMetricsTest, TimeInactiveMicrosShouldBeSetUponUnstashAndAbort) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Time inactive should be greater than or equal to zero.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{0});
tickSource->advance(Microseconds(100));
// Time inactive should have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
txnParticipant->unstashTransactionResources(opCtx(), "insert");
txnParticipant->abortArbitraryTransaction();
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
tickSource->advance(Microseconds(100));
// The transaction has aborted, so time inactive should not have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
}
TEST_F(TransactionsMetricsTest, TimeInactiveMicrosShouldIncreaseUntilCommit) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Time inactive should be greater than or equal to zero.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{0});
tickSource->advance(Microseconds(100));
// Time inactive should have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->commitUnpreparedTransaction(opCtx());
tickSource->advance(Microseconds(100));
// The transaction has committed, so time inactive should not have increased.
ASSERT_EQ(txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(
tickSource, tickSource->getTicks()),
Microseconds{100});
}
TEST_F(TransactionsMetricsTest, ReportStashedResources) {
auto clockSource = initMockPreciseClockSource();
auto startTime = Date_t::now();
clockSource->reset(startTime);
const bool autocommit = false;
ASSERT(opCtx()->lockState());
ASSERT(opCtx()->recoveryUnit());
OperationContextSessionMongod opCtxSession(opCtx(), true, autocommit, true);
// Create a ClientMetadata object and set it on ClientMetadataIsMasterState.
BSONObjBuilder builder;
ASSERT_OK(ClientMetadata::serializePrivate("driverName",
"driverVersion",
"osType",
"osName",
"osArchitecture",
"osVersion",
"appName",
&builder));
auto obj = builder.obj();
auto clientMetadata = ClientMetadata::parse(obj["client"]);
auto& clientMetadataIsMasterState = ClientMetadataIsMasterState::get(opCtx()->getClient());
clientMetadataIsMasterState.setClientMetadata(opCtx()->getClient(),
std::move(clientMetadata.getValue()));
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
// Perform initial unstash which sets up a WriteUnitOfWork.
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "find");
ASSERT(opCtx()->getWriteUnitOfWork());
ASSERT(opCtx()->lockState()->isLocked());
// Stash resources. The original Locker and RecoveryUnit now belong to the stash.
txnParticipant->stashTransactionResources(opCtx());
ASSERT(!opCtx()->getWriteUnitOfWork());
// Verify that the Session's report of its own stashed state aligns with our expectations.
auto stashedState = txnParticipant->reportStashedState();
auto transactionDocument = stashedState.getObjectField("transaction");
auto parametersDocument = transactionDocument.getObjectField("parameters");
ASSERT_EQ(stashedState.getField("host").valueStringData().toString(),
getHostNameCachedAndPort());
ASSERT_EQ(stashedState.getField("desc").valueStringData().toString(), "inactive transaction");
ASSERT_BSONOBJ_EQ(stashedState.getField("lsid").Obj(), _sessionId.toBSON());
ASSERT_EQ(parametersDocument.getField("txnNumber").numberLong(), *opCtx()->getTxnNumber());
ASSERT_EQ(parametersDocument.getField("autocommit").boolean(), autocommit);
ASSERT_BSONELT_EQ(parametersDocument.getField("readConcern"),
readConcernArgs.toBSON().getField("readConcern"));
ASSERT_GTE(transactionDocument.getField("readTimestamp").timestamp(), Timestamp(0, 0));
ASSERT_EQ(
dateFromISOString(transactionDocument.getField("startWallClockTime").valueStringData())
.getValue(),
startTime);
ASSERT_EQ(
dateFromISOString(transactionDocument.getField("expiryTime").valueStringData()).getValue(),
startTime + stdx::chrono::seconds{transactionLifetimeLimitSeconds.load()});
ASSERT_EQ(stashedState.getField("client").valueStringData().toString(), "");
ASSERT_EQ(stashedState.getField("connectionId").numberLong(), 0);
ASSERT_EQ(stashedState.getField("appName").valueStringData().toString(), "appName");
ASSERT_BSONOBJ_EQ(stashedState.getField("clientMetadata").Obj(), obj.getField("client").Obj());
ASSERT_EQ(stashedState.getField("waitingForLock").boolean(), false);
ASSERT_EQ(stashedState.getField("active").boolean(), false);
// For the following time metrics, we are only verifying that the transaction sub-document is
// being constructed correctly with proper types because we have other tests to verify that the
// values are being tracked correctly.
ASSERT_GTE(transactionDocument.getField("timeOpenMicros").numberLong(), 0);
ASSERT_GTE(transactionDocument.getField("timeActiveMicros").numberLong(), 0);
ASSERT_GTE(transactionDocument.getField("timeInactiveMicros").numberLong(), 0);
// Unset the read concern on the OperationContext. This is needed to unstash.
repl::ReadConcernArgs::get(opCtx()) = repl::ReadConcernArgs();
// Unstash the stashed resources. This restores the original Locker and RecoveryUnit to the
// OperationContext.
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
ASSERT(opCtx()->getWriteUnitOfWork());
// With the resources unstashed, verify that the Session reports an empty stashed state.
ASSERT(txnParticipant->reportStashedState().isEmpty());
// Commit the transaction. This allows us to release locks.
txnParticipant->commitUnpreparedTransaction(opCtx());
}
TEST_F(TransactionsMetricsTest, ReportUnstashedResources) {
auto clockSource = initMockPreciseClockSource();
auto startTime = Date_t::now();
clockSource->reset(startTime);
ASSERT(opCtx()->lockState());
ASSERT(opCtx()->recoveryUnit());
const auto autocommit = false;
OperationContextSessionMongod opCtxSession(opCtx(), true, autocommit, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
// Perform initial unstash which sets up a WriteUnitOfWork.
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "find");
ASSERT(opCtx()->getWriteUnitOfWork());
ASSERT(opCtx()->lockState()->isLocked());
// Verify that the Session's report of its own unstashed state aligns with our expectations.
BSONObjBuilder unstashedStateBuilder;
txnParticipant->reportUnstashedState(repl::ReadConcernArgs::get(opCtx()),
&unstashedStateBuilder);
auto unstashedState = unstashedStateBuilder.obj();
auto transactionDocument = unstashedState.getObjectField("transaction");
auto parametersDocument = transactionDocument.getObjectField("parameters");
ASSERT_EQ(parametersDocument.getField("txnNumber").numberLong(), *opCtx()->getTxnNumber());
ASSERT_EQ(parametersDocument.getField("autocommit").boolean(), autocommit);
ASSERT_BSONELT_EQ(parametersDocument.getField("readConcern"),
readConcernArgs.toBSON().getField("readConcern"));
ASSERT_GTE(transactionDocument.getField("readTimestamp").timestamp(), Timestamp(0, 0));
ASSERT_EQ(
dateFromISOString(transactionDocument.getField("startWallClockTime").valueStringData())
.getValue(),
startTime);
ASSERT_EQ(
dateFromISOString(transactionDocument.getField("expiryTime").valueStringData()).getValue(),
startTime + stdx::chrono::seconds{transactionLifetimeLimitSeconds.load()});
// For the following time metrics, we are only verifying that the transaction sub-document is
// being constructed correctly with proper types because we have other tests to verify that
// the values are being tracked correctly.
ASSERT_GTE(transactionDocument.getField("timeOpenMicros").numberLong(), 0);
ASSERT_GTE(transactionDocument.getField("timeActiveMicros").numberLong(), 0);
ASSERT_GTE(transactionDocument.getField("timeInactiveMicros").numberLong(), 0);
// Stash resources. The original Locker and RecoveryUnit now belong to the stash.
txnParticipant->stashTransactionResources(opCtx());
ASSERT(!opCtx()->getWriteUnitOfWork());
// With the resources stashed, verify that the Session reports an empty unstashed state.
BSONObjBuilder builder;
txnParticipant->reportUnstashedState(repl::ReadConcernArgs::get(opCtx()), &builder);
ASSERT(builder.obj().isEmpty());
}
TEST_F(TransactionsMetricsTest, ReportUnstashedResourcesForARetryableWrite) {
ASSERT(opCtx()->lockState());
ASSERT(opCtx()->recoveryUnit());
OperationContextSessionMongod opCtxSession(opCtx(), true, boost::none, boost::none);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "find");
// Build a BSONObj containing the details which we expect to see reported when we call
// Session::reportUnstashedState. For a retryable write, we should only include the txnNumber.
BSONObjBuilder reportBuilder;
BSONObjBuilder transactionBuilder(reportBuilder.subobjStart("transaction"));
BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters"));
parametersBuilder.append("txnNumber", *opCtx()->getTxnNumber());
parametersBuilder.done();
transactionBuilder.done();
// Verify that the Session's report of its own unstashed state aligns with our expectations.
BSONObjBuilder unstashedStateBuilder;
txnParticipant->reportUnstashedState(repl::ReadConcernArgs::get(opCtx()),
&unstashedStateBuilder);
ASSERT_BSONOBJ_EQ(unstashedStateBuilder.obj(), reportBuilder.obj());
}
namespace {
/*
* Constructs a ClientMetadata BSONObj with the given application name.
*/
BSONObj constructClientMetadata(StringData appName) {
BSONObjBuilder builder;
ASSERT_OK(ClientMetadata::serializePrivate("driverName",
"driverVersion",
"osType",
"osName",
"osArchitecture",
"osVersion",
appName,
&builder));
return builder.obj();
}
} // namespace
TEST_F(TransactionsMetricsTest, LastClientInfoShouldUpdateUponStash) {
// Create a ClientMetadata object and set it on ClientMetadataIsMasterState.
auto obj = constructClientMetadata("appName");
auto clientMetadata = ClientMetadata::parse(obj["client"]);
auto& clientMetadataIsMasterState = ClientMetadataIsMasterState::get(opCtx()->getClient());
clientMetadataIsMasterState.setClientMetadata(opCtx()->getClient(),
std::move(clientMetadata.getValue()));
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction machinery cannot store an empty locker.
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
// LastClientInfo should have been set.
auto lastClientInfo = txnParticipant->getSingleTransactionStats().getLastClientInfo();
ASSERT_EQ(lastClientInfo.clientHostAndPort, "");
ASSERT_EQ(lastClientInfo.connectionId, 0);
ASSERT_EQ(lastClientInfo.appName, "appName");
ASSERT_BSONOBJ_EQ(lastClientInfo.clientMetadata, obj.getField("client").Obj());
// Create another ClientMetadata object.
auto newObj = constructClientMetadata("newAppName");
auto newClientMetadata = ClientMetadata::parse(newObj["client"]);
clientMetadataIsMasterState.setClientMetadata(opCtx()->getClient(),
std::move(newClientMetadata.getValue()));
txnParticipant->unstashTransactionResources(opCtx(), "insert");
txnParticipant->stashTransactionResources(opCtx());
// LastClientInfo's clientMetadata should have been updated to the new ClientMetadata object.
lastClientInfo = txnParticipant->getSingleTransactionStats().getLastClientInfo();
ASSERT_EQ(lastClientInfo.appName, "newAppName");
ASSERT_BSONOBJ_EQ(lastClientInfo.clientMetadata, newObj.getField("client").Obj());
}
TEST_F(TransactionsMetricsTest, LastClientInfoShouldUpdateUponCommit) {
// Create a ClientMetadata object and set it on ClientMetadataIsMasterState.
auto obj = constructClientMetadata("appName");
auto clientMetadata = ClientMetadata::parse(obj["client"]);
auto& clientMetadataIsMasterState = ClientMetadataIsMasterState::get(opCtx()->getClient());
clientMetadataIsMasterState.setClientMetadata(opCtx()->getClient(),
std::move(clientMetadata.getValue()));
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
// The transaction machinery cannot store an empty locker.
Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow);
txnParticipant->commitUnpreparedTransaction(opCtx());
// LastClientInfo should have been set.
auto lastClientInfo = txnParticipant->getSingleTransactionStats().getLastClientInfo();
ASSERT_EQ(lastClientInfo.clientHostAndPort, "");
ASSERT_EQ(lastClientInfo.connectionId, 0);
ASSERT_EQ(lastClientInfo.appName, "appName");
ASSERT_BSONOBJ_EQ(lastClientInfo.clientMetadata, obj.getField("client").Obj());
}
TEST_F(TransactionsMetricsTest, LastClientInfoShouldUpdateUponAbort) {
// Create a ClientMetadata object and set it on ClientMetadataIsMasterState.
auto obj = constructClientMetadata("appName");
auto clientMetadata = ClientMetadata::parse(obj["client"]);
auto& clientMetadataIsMasterState = ClientMetadataIsMasterState::get(opCtx()->getClient());
clientMetadataIsMasterState.setClientMetadata(opCtx()->getClient(),
std::move(clientMetadata.getValue()));
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
txnParticipant->abortActiveTransaction(opCtx());
// LastClientInfo should have been set.
auto lastClientInfo = txnParticipant->getSingleTransactionStats().getLastClientInfo();
ASSERT_EQ(lastClientInfo.clientHostAndPort, "");
ASSERT_EQ(lastClientInfo.connectionId, 0);
ASSERT_EQ(lastClientInfo.appName, "appName");
ASSERT_BSONOBJ_EQ(lastClientInfo.clientMetadata, obj.getField("client").Obj());
}
/*
* Sets up the additive metrics for Transactions Metrics test.
*/
void setupAdditiveMetrics(const int metricValue, OperationContext* opCtx) {
CurOp::get(opCtx)->debug().additiveMetrics.keysExamined = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.docsExamined = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.nMatched = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.nModified = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.ninserted = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.ndeleted = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.nmoved = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.keysInserted = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.keysDeleted = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.prepareReadConflicts = metricValue;
CurOp::get(opCtx)->debug().additiveMetrics.writeConflicts = metricValue;
}
/*
* Builds expected parameters info string.
*/
void buildParametersInfoString(StringBuilder* sb,
LogicalSessionId sessionId,
const TxnNumber txnNum,
const repl::ReadConcernArgs readConcernArgs) {
BSONObjBuilder lsidBuilder;
sessionId.serialize(&lsidBuilder);
(*sb) << "parameters:{ lsid: " << lsidBuilder.done().toString() << ", txnNumber: " << txnNum
<< ", autocommit: false"
<< ", readConcern: " << readConcernArgs.toBSON().getObjectField("readConcern") << " },";
}
/*
* Builds expected single transaction stats info string.
*/
void buildSingleTransactionStatsString(StringBuilder* sb, const int metricValue) {
(*sb) << " keysExamined:" << metricValue << " docsExamined:" << metricValue
<< " nMatched:" << metricValue << " nModified:" << metricValue
<< " ninserted:" << metricValue << " ndeleted:" << metricValue
<< " nmoved:" << metricValue << " keysInserted:" << metricValue
<< " keysDeleted:" << metricValue << " prepareReadConflicts:" << metricValue
<< " writeConflicts:" << metricValue;
}
/*
* Builds the time active and time inactive info string.
*/
void buildTimeActiveInactiveString(StringBuilder* sb,
TransactionParticipant* txnParticipant,
TickSource* tickSource,
TickSource::Tick curTick) {
// Add time active micros to string.
(*sb) << " timeActiveMicros:"
<< durationCount(
txnParticipant->getSingleTransactionStats().getTimeActiveMicros(tickSource,
curTick));
// Add time inactive micros to string.
(*sb) << " timeInactiveMicros:"
<< durationCount(
txnParticipant->getSingleTransactionStats().getTimeInactiveMicros(tickSource,
curTick));
}
/*
* Builds the entire expected transaction info string and returns it.
*/
std::string buildTransactionInfoString(OperationContext* opCtx,
TransactionParticipant* txnParticipant,
std::string terminationCause,
const LogicalSessionId sessionId,
const TxnNumber txnNum,
const int metricValue) {
// Calling transactionInfoForLog to get the actual transaction info string.
const auto lockerInfo =
opCtx->lockState()->getLockerInfo(CurOp::get(*opCtx)->getLockStatsBase());
// Building expected transaction info string.
StringBuilder parametersInfo;
buildParametersInfoString(
¶metersInfo, sessionId, txnNum, repl::ReadConcernArgs::get(opCtx));
StringBuilder readTimestampInfo;
readTimestampInfo
<< " readTimestamp:"
<< txnParticipant->getSpeculativeTransactionReadOpTimeForTest().getTimestamp().toString()
<< ",";
StringBuilder singleTransactionStatsInfo;
buildSingleTransactionStatsString(&singleTransactionStatsInfo, metricValue);
auto tickSource = opCtx->getServiceContext()->getTickSource();
StringBuilder timeActiveAndInactiveInfo;
buildTimeActiveInactiveString(
&timeActiveAndInactiveInfo, txnParticipant, tickSource, tickSource->getTicks());
BSONObjBuilder locks;
if (lockerInfo) {
lockerInfo->stats.report(&locks);
}
// Puts all the substrings together into one expected info string. The expected info string will
// look something like this:
// parameters:{ lsid: { id: UUID("f825288c-100e-49a1-9fd7-b95c108049e6"), uid: BinData(0,
// E3B0C44298FC1C149AFBF4C8996FB92427AE41E4649B934CA495991B7852B855) }, txnNumber: 1,
// autocommit: false }, readTimestamp:Timestamp(0, 0), keysExamined:1 docsExamined:1 nMatched:1
// nModified:1 ninserted:1 ndeleted:1 nmoved:1 keysInserted:1 keysDeleted:1
// prepareReadConflicts:1 writeConflicts:1 terminationCause:committed timeActiveMicros:3
// timeInactiveMicros:2 numYields:0 locks:{ Global: { acquireCount: { r: 6, w: 4 } }, Database:
// { acquireCount: { r: 1, w: 1, W: 2 } }, Collection: { acquireCount: { R: 1 } }, oplog: {
// acquireCount: { W: 1 } } } 0ms
StringBuilder expectedTransactionInfo;
expectedTransactionInfo << parametersInfo.str() << readTimestampInfo.str()
<< singleTransactionStatsInfo.str()
<< " terminationCause:" << terminationCause
<< timeActiveAndInactiveInfo.str() << " numYields:" << 0
<< " locks:" << locks.done().toString() << " "
<< duration_cast(
txnParticipant->getSingleTransactionStats().getDuration(
tickSource, tickSource->getTicks()));
return expectedTransactionInfo.str();
}
TEST_F(TransactionsMetricsTest, TestTransactionInfoForLogAfterCommit) {
// Initialize SingleTransactionStats AdditiveMetrics objects.
const int metricValue = 1;
setupAdditiveMetrics(metricValue, opCtx());
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
txnParticipant->commitUnpreparedTransaction(opCtx());
const auto lockerInfo = opCtx()->lockState()->getLockerInfo(boost::none);
ASSERT(lockerInfo);
std::string testTransactionInfo =
txnParticipant->transactionInfoForLogForTest(&lockerInfo->stats, true, readConcernArgs);
std::string expectedTransactionInfo =
buildTransactionInfoString(opCtx(),
txnParticipant,
"committed",
*opCtx()->getLogicalSessionId(),
*opCtx()->getTxnNumber(),
metricValue);
ASSERT_EQ(testTransactionInfo, expectedTransactionInfo);
}
TEST_F(TransactionsMetricsTest, TestTransactionInfoForLogAfterAbort) {
// Initialize SingleTransactionStats AdditiveMetrics objects.
const int metricValue = 1;
setupAdditiveMetrics(metricValue, opCtx());
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
txnParticipant->abortActiveTransaction(opCtx());
const auto lockerInfo = opCtx()->lockState()->getLockerInfo(boost::none);
ASSERT(lockerInfo);
std::string testTransactionInfo =
txnParticipant->transactionInfoForLogForTest(&lockerInfo->stats, false, readConcernArgs);
std::string expectedTransactionInfo =
buildTransactionInfoString(opCtx(),
txnParticipant,
"aborted",
*opCtx()->getLogicalSessionId(),
*opCtx()->getTxnNumber(),
metricValue);
ASSERT_EQ(testTransactionInfo, expectedTransactionInfo);
}
DEATH_TEST_F(TransactionsMetricsTest, TestTransactionInfoForLogWithNoLockerInfoStats, "invariant") {
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
auto txnParticipant = TransactionParticipant::get(opCtx());
const auto lockerInfo = opCtx()->lockState()->getLockerInfo(boost::none);
ASSERT(lockerInfo);
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
txnParticipant->commitUnpreparedTransaction(opCtx());
txnParticipant->transactionInfoForLogForTest(nullptr, true, readConcernArgs);
}
TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowCommit) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
auto txnParticipant = TransactionParticipant::get(opCtx());
// Initialize SingleTransactionStats AdditiveMetrics objects.
const int metricValue = 1;
setupAdditiveMetrics(metricValue, opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "commitTransaction");
serverGlobalParams.slowMS = 10;
tickSource->advance(Microseconds(11 * 1000));
startCapturingLogMessages();
txnParticipant->commitUnpreparedTransaction(opCtx());
stopCapturingLogMessages();
const auto lockerInfo = opCtx()->lockState()->getLockerInfo(boost::none);
ASSERT(lockerInfo);
std::string expectedTransactionInfo = "transaction " +
txnParticipant->transactionInfoForLogForTest(&lockerInfo->stats, true, readConcernArgs);
ASSERT_EQUALS(1, countLogLinesContaining(expectedTransactionInfo));
}
TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowAbort) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
auto txnParticipant = TransactionParticipant::get(opCtx());
// Initialize SingleTransactionStats AdditiveMetrics objects.
const int metricValue = 1;
setupAdditiveMetrics(metricValue, opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "abortTransaction");
serverGlobalParams.slowMS = 10;
tickSource->advance(Microseconds(11 * 1000));
startCapturingLogMessages();
txnParticipant->abortActiveTransaction(opCtx());
stopCapturingLogMessages();
const auto lockerInfo = opCtx()->lockState()->getLockerInfo(boost::none);
ASSERT(lockerInfo);
std::string expectedTransactionInfo = "transaction " +
txnParticipant->transactionInfoForLogForTest(&lockerInfo->stats, false, readConcernArgs);
ASSERT_EQUALS(1, countLogLinesContaining(expectedTransactionInfo));
}
TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowStashedAbort) {
auto tickSource = initMockTickSource();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
repl::ReadConcernArgs readConcernArgs;
ASSERT_OK(readConcernArgs.initialize(BSON("find"
<< "test"
<< repl::ReadConcernArgs::kReadConcernFieldName
<< BSON(repl::ReadConcernArgs::kLevelFieldName
<< "snapshot"))));
repl::ReadConcernArgs::get(opCtx()) = readConcernArgs;
auto txnParticipant = TransactionParticipant::get(opCtx());
// Initialize SingleTransactionStats AdditiveMetrics objects.
const int metricValue = 1;
setupAdditiveMetrics(metricValue, opCtx());
txnParticipant->unstashTransactionResources(opCtx(), "insert");
{ Lock::GlobalLock lk(opCtx(), MODE_IX, Date_t::now(), Lock::InterruptBehavior::kThrow); }
txnParticipant->stashTransactionResources(opCtx());
const auto txnResourceStashLocker = txnParticipant->getTxnResourceStashLockerForTest();
ASSERT(txnResourceStashLocker);
const auto lockerInfo = txnResourceStashLocker->getLockerInfo(boost::none);
serverGlobalParams.slowMS = 10;
tickSource->advance(Microseconds(11 * 1000));
startCapturingLogMessages();
txnParticipant->abortArbitraryTransaction();
stopCapturingLogMessages();
std::string expectedTransactionInfo = "transaction " +
txnParticipant->transactionInfoForLogForTest(&lockerInfo->stats, false, readConcernArgs);
ASSERT_EQUALS(1, countLogLinesContaining(expectedTransactionInfo));
}
TEST_F(TxnParticipantTest, WhenOldestTSRemovedNextOldestBecomesNewOldest) {
auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Check that there are no Timestamps in the set.
unsigned int zero = 0;
ASSERT_EQ(totalActiveTxnTS, zero);
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
auto firstPrepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
// Check that we added a Timestamp to the set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1);
// totalActiveTxnTS = 1
totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
// Check that the oldest prepareTimestamp is equal to firstPrepareTimestamp because there is
// only one prepared transaction on this Service.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), firstPrepareTimestamp);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
txnParticipant->stashTransactionResources(opCtx());
auto originalClient = Client::releaseCurrent();
/**
* Make a new Session, Client, OperationContext and transaction.
*/
auto service = opCtx()->getServiceContext();
auto newClientOwned = service->makeClient("newClient");
auto newClient = newClientOwned.get();
Client::setCurrent(std::move(newClientOwned));
const TxnNumber newTxnNum = 10;
const auto newSessionId = makeLogicalSessionIdForTest();
auto secondPrepareTimestamp = Timestamp();
{
auto newOpCtx = newClient->makeOperationContext();
newOpCtx.get()->setLogicalSessionId(newSessionId);
newOpCtx.get()->setTxnNumber(newTxnNum);
OperationContextSessionMongod newOpCtxSession(newOpCtx.get(), true, false, true);
auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get());
newTxnParticipant->unstashTransactionResources(newOpCtx.get(), "prepareTransaction");
// secondPrepareTimestamp should be greater than firstPreparedTimestamp because this
// transaction was prepared after.
secondPrepareTimestamp = newTxnParticipant->prepareTransaction(newOpCtx.get(), {});
ASSERT_GT(secondPrepareTimestamp, firstPrepareTimestamp);
// Check that we added a Timestamp to the set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(),
totalActiveTxnTS + 1);
// totalActiveTxnTS = 2
totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
// The oldest prepareTimestamp should still be firstPrepareTimestamp.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(),
firstPrepareTimestamp);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
}
Client::releaseCurrent();
Client::setCurrent(std::move(originalClient));
// Switch clients and abort the first transaction. This should cause the oldestActiveTS to be
// equal to the secondPrepareTimestamp.
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(txnParticipant->transactionIsAborted());
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), secondPrepareTimestamp);
}
TEST_F(TxnParticipantTest, ReturnNullTimestampIfNoOldestActiveTimestamp) {
auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
auto txnParticipant = TransactionParticipant::get(opCtx());
// Check that there are no Timestamps in the set.
unsigned int zero = 0;
ASSERT_EQ(totalActiveTxnTS, zero);
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
txnParticipant->prepareTransaction(opCtx(), {});
// Check that we added a Timestamp to the set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1);
// totalActiveTxnTS = 1
totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
ASSERT_FALSE(txnParticipant->transactionIsAborted());
txnParticipant->stashTransactionResources(opCtx());
auto originalClient = Client::releaseCurrent();
/**
* Make a new Session, Client, OperationContext and transaction.
*/
auto service = opCtx()->getServiceContext();
auto newClientOwned = service->makeClient("newClient");
auto newClient = newClientOwned.get();
Client::setCurrent(std::move(newClientOwned));
const TxnNumber newTxnNum = 10;
const auto newSessionId = makeLogicalSessionIdForTest();
{
auto newOpCtx = newClient->makeOperationContext();
newOpCtx.get()->setLogicalSessionId(newSessionId);
newOpCtx.get()->setTxnNumber(newTxnNum);
OperationContextSessionMongod newOpCtxSession(newOpCtx.get(), true, false, true);
auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get());
newTxnParticipant->unstashTransactionResources(newOpCtx.get(), "prepareTransaction");
// secondPrepareTimestamp should be greater than firstPreparedTimestamp because this
// transaction was prepared after.
newTxnParticipant->prepareTransaction(newOpCtx.get(), {});
// Check that we added a Timestamp to the set.
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(),
totalActiveTxnTS + 1);
// totalActiveTxnTS = 2
totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
// The oldest prepareTimestamp should still be firstPrepareTimestamp.
ASSERT_FALSE(txnParticipant->transactionIsAborted());
// Abort this transaction and check that we have decremented the total active timestamps
// count.
newTxnParticipant->abortActiveTransaction(newOpCtx.get());
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(),
totalActiveTxnTS - 1);
}
Client::releaseCurrent();
Client::setCurrent(std::move(originalClient));
// totalActiveTxnTS = 1
totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
// Switch clients and abort the first transaction. This means we no longer have an oldest active
// timestamp.
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(txnParticipant->transactionIsAborted());
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1);
ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none);
}
} // namespace
} // namespace mongo