summaryrefslogtreecommitdiff
path: root/src
diff options
context:
space:
mode:
authorKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-12-14 15:30:08 -0500
committerKaloian Manassiev <kaloian.manassiev@mongodb.com>2018-12-18 22:27:32 -0500
commit20d857c7609bd2e7cbba5649bd943a7073ea1509 (patch)
tree0e9b72019fa3ecf9653d1522d2086b4e92123e81 /src
parent4f1e7fb94ce21e21b05dd49c4ed46e6c5231bd2a (diff)
downloadmongo-20d857c7609bd2e7cbba5649bd943a7073ea1509.tar.gz
SERVER-38677 Remove the invalidations counter from TransactionParticipant
Diffstat (limited to 'src')
-rw-r--r--src/mongo/db/op_observer_impl_test.cpp4
-rw-r--r--src/mongo/db/operation_context.cpp2
-rw-r--r--src/mongo/db/operation_context.h11
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination.cpp2
-rw-r--r--src/mongo/db/s/session_catalog_migration_destination_test.cpp2
-rw-r--r--src/mongo/db/service_entry_point_common.cpp7
-rw-r--r--src/mongo/db/session_catalog_mongod.cpp2
-rw-r--r--src/mongo/db/transaction_participant.cpp73
-rw-r--r--src/mongo/db/transaction_participant.h102
-rw-r--r--src/mongo/db/transaction_participant_retryable_writes_test.cpp170
-rw-r--r--src/mongo/db/transaction_participant_test.cpp4
11 files changed, 180 insertions, 199 deletions
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp
index 7110ff5aeb8..1fb9711b7c2 100644
--- a/src/mongo/db/op_observer_impl_test.cpp
+++ b/src/mongo/db/op_observer_impl_test.cpp
@@ -349,7 +349,7 @@ TEST_F(OpObserverSessionCatalogRollbackTest,
// Create a session and sync it from disk
auto session = sessionCatalog->checkOutSession(opCtx.get());
const auto txnParticipant = TransactionParticipant::get(session.get());
- txnParticipant->refreshFromStorageIfNeeded(opCtx.get());
+ txnParticipant->refreshFromStorageIfNeeded();
// Simulate a write occurring on that session
simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId);
@@ -398,7 +398,7 @@ TEST_F(OpObserverSessionCatalogRollbackTest,
// Create a session and sync it from disk
auto session = sessionCatalog->checkOutSession(opCtx.get());
const auto txnParticipant = TransactionParticipant::get(session.get());
- txnParticipant->refreshFromStorageIfNeeded(opCtx.get());
+ txnParticipant->refreshFromStorageIfNeeded();
// Simulate a write occurring on that session
simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId);
diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp
index 833eb145938..88af8855212 100644
--- a/src/mongo/db/operation_context.cpp
+++ b/src/mongo/db/operation_context.cpp
@@ -85,6 +85,8 @@ OperationContext::OperationContext(Client* client, unsigned int opId)
_elapsedTime(client ? client->getServiceContext()->getTickSource()
: SystemTickSource::get()) {}
+OperationContext::~OperationContext() = default;
+
void OperationContext::setDeadlineAndMaxTime(Date_t when,
Microseconds maxTime,
ErrorCodes::Error timeoutError) {
diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h
index 92845652984..df8694b0e05 100644
--- a/src/mongo/db/operation_context.h
+++ b/src/mongo/db/operation_context.h
@@ -79,8 +79,7 @@ class OperationContext : public Interruptible, public Decorable<OperationContext
public:
OperationContext(Client* client, unsigned int opId);
-
- virtual ~OperationContext() = default;
+ virtual ~OperationContext();
/**
* Interface for durability. Caller DOES NOT own pointer.
@@ -164,7 +163,7 @@ public:
/**
* Returns the session ID associated with this operation, if there is one.
*/
- boost::optional<LogicalSessionId> getLogicalSessionId() const {
+ const boost::optional<LogicalSessionId>& getLogicalSessionId() const {
return _lsid;
}
@@ -407,7 +406,9 @@ private:
friend class WriteUnitOfWork;
friend class repl::UnreplicatedWritesBlock;
+
Client* const _client;
+
const unsigned int _opId;
boost::optional<LogicalSessionId> _lsid;
@@ -447,8 +448,8 @@ private:
WriteConcernOptions _writeConcern;
- Date_t _deadline =
- Date_t::max(); // The timepoint at which this operation exceeds its time limit.
+ // The timepoint at which this operation exceeds its time limit.
+ Date_t _deadline = Date_t::max();
ErrorCodes::Error _timeoutError = ErrorCodes::ExceededTimeLimit;
bool _ignoreInterrupts = false;
diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp
index f6a75f6f772..036e319c59e 100644
--- a/src/mongo/db/s/session_catalog_migration_destination.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination.cpp
@@ -249,7 +249,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx,
auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, result.sessionId);
auto const txnParticipant = TransactionParticipant::get(scopedSession.get());
- txnParticipant->refreshFromStorageIfNeeded(opCtx);
+ txnParticipant->refreshFromStorageIfNeeded();
if (!txnParticipant->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) {
// Don't continue migrating the transaction history
diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
index 75f4f390fe4..ffc5cd400f1 100644
--- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp
+++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp
@@ -1589,7 +1589,7 @@ TEST_F(SessionCatalogMigrationDestinationTest,
auto scopedSession =
SessionCatalog::get(opCtx)->checkOutSession(opCtx, *sessionInfo1.getSessionId());
const auto txnParticipant = TransactionParticipant::get(scopedSession.get());
- txnParticipant->refreshFromStorageIfNeeded(opCtx);
+ txnParticipant->refreshFromStorageIfNeeded();
txnParticipant->beginOrContinue(3, boost::none, boost::none);
}
diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp
index 380e7784bdb..e365b2bdafe 100644
--- a/src/mongo/db/service_entry_point_common.cpp
+++ b/src/mongo/db/service_entry_point_common.cpp
@@ -370,17 +370,16 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx,
if (sessionOptions.getCoordinator() == boost::optional<bool>(true)) {
createTransactionCoordinator(opCtx, *sessionOptions.getTxnNumber());
}
- }
-
- if (txnParticipant->inMultiDocumentTransaction() && !sessionOptions.getStartTransaction()) {
+ } else if (txnParticipant->inMultiDocumentTransaction()) {
const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
"Only the first command in a transaction may specify a readConcern",
readConcernArgs.isEmpty());
}
+
+ txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName());
}
- txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName());
ScopeGuard guard = MakeGuard([&txnParticipant, opCtx]() {
txnParticipant->abortActiveUnpreparedOrStashPreparedTransaction(opCtx);
});
diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp
index 9ef31bc588b..c2ef072d72a 100644
--- a/src/mongo/db/session_catalog_mongod.cpp
+++ b/src/mongo/db/session_catalog_mongod.cpp
@@ -194,7 +194,7 @@ MongoDOperationContextSession::MongoDOperationContextSession(OperationContext* o
: _operationContextSession(opCtx) {
if (!opCtx->getClient()->isInDirectClient()) {
const auto txnParticipant = TransactionParticipant::get(opCtx);
- txnParticipant->refreshFromStorageIfNeeded(opCtx);
+ txnParticipant->refreshFromStorageIfNeeded();
}
}
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 5f67b995cf7..5929c41f752 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -306,6 +306,13 @@ const LogicalSessionId& TransactionParticipant::_sessionId() const {
return owningSession->getSessionId();
}
+OperationContext* TransactionParticipant::_opCtx() const {
+ const auto* owningSession = getTransactionParticipant.owner(this);
+ auto* opCtx = owningSession->currentOperation();
+ invariant(opCtx);
+ return opCtx;
+}
+
void TransactionParticipant::_beginOrContinueRetryableWrite(WithLock wl, TxnNumber txnNumber) {
if (txnNumber > _activeTxnNumber) {
// New retryable write.
@@ -697,7 +704,7 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
return;
}
- _checkIsCommandValidWithTxnState(lg, opCtx, cmdName);
+ _checkIsCommandValidWithTxnState(lg, *opCtx->getTxnNumber(), cmdName);
if (_txnResourceStash) {
// Transaction resources already exist for this transaction. Transfer them from the
@@ -713,6 +720,7 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
// If we have no transaction resources then we cannot be prepared. If we're not in progress,
// we don't do anything else.
invariant(!_txnState.isPrepared(lg));
+
if (!_txnState.isInProgress(lg)) {
// At this point we're either committed and this is a 'commitTransaction' command, or we
// are in the process of committing.
@@ -1115,6 +1123,7 @@ bool TransactionParticipant::expired() const {
void TransactionParticipant::abortActiveTransaction(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lock(_mutex);
+
// This function shouldn't throw if the transaction is already aborted.
_checkIsActiveTransaction(lock, *opCtx->getTxnNumber(), false);
_abortActiveTransaction(
@@ -1186,9 +1195,11 @@ void TransactionParticipant::_abortActiveTransaction(stdx::unique_lock<stdx::mut
// unlock the session to run the opObserver onTransactionAbort, which calls back into the
// session.
lock.unlock();
+
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
opObserver->onTransactionAbort(opCtx, abortOplogSlot);
+
lock.lock();
// We do not check if the active transaction number is correct here because we handle it below.
@@ -1297,17 +1308,17 @@ void TransactionParticipant::_checkIsActiveTransaction(WithLock wl,
}
void TransactionParticipant::_checkIsCommandValidWithTxnState(WithLock wl,
- OperationContext* opCtx,
+ const TxnNumber& requestTxnNumber,
const std::string& cmdName) {
// Throw NoSuchTransaction error instead of TransactionAborted error since this is the entry
// point of transaction execution.
uassert(ErrorCodes::NoSuchTransaction,
- str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.",
+ str::stream() << "Transaction " << requestTxnNumber << " has been aborted.",
!_txnState.isAborted(wl));
// Cannot change committed transaction but allow retrying commitTransaction command.
uassert(ErrorCodes::TransactionCommitted,
- str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been committed.",
+ str::stream() << "Transaction " << requestTxnNumber << " has been committed.",
cmdName == "commitTransaction" || !_txnState.isCommitted(wl));
// Disallow operations other than abort, prepare or commit on a prepared transaction
@@ -1486,7 +1497,7 @@ void TransactionParticipant::_reportTransactionStats(WithLock wl,
std::string TransactionParticipant::_transactionInfoForLog(
const SingleThreadedLockStats* lockStats,
TransactionState::StateFlag terminationCause,
- repl::ReadConcernArgs readConcernArgs) {
+ repl::ReadConcernArgs readConcernArgs) const {
invariant(lockStats);
invariant(terminationCause == TransactionState::kCommitted ||
terminationCause == TransactionState::kAborted);
@@ -1596,45 +1607,34 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN
_transactionMetricsObserver.resetSingleTransactionStats(txnNumber);
}
-void TransactionParticipant::refreshFromStorageIfNeeded(OperationContext* opCtx) {
- if (opCtx->getClient()->isInDirectClient()) {
- return;
- }
-
+void TransactionParticipant::refreshFromStorageIfNeeded() {
+ const auto opCtx = _opCtx();
+ invariant(!opCtx->getClient()->isInDirectClient());
invariant(!opCtx->lockState()->isLocked());
- stdx::unique_lock<stdx::mutex> ul(_mutex);
-
- while (!_isValid) {
- const int numInvalidations = _numInvalidations;
-
- ul.unlock();
+ if (_isValid)
+ return;
- auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId());
+ auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId());
- ul.lock();
+ stdx::lock_guard<stdx::mutex> lg(_mutex);
- // Protect against concurrent refreshes or invalidations
- if (!_isValid && _numInvalidations == numInvalidations) {
- _isValid = true;
- _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord);
+ _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord);
- if (_lastWrittenSessionRecord) {
- _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum();
- _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements);
- _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory;
+ if (_lastWrittenSessionRecord) {
+ _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum();
+ _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements);
+ _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory;
- if (activeTxnHistory.transactionCommitted) {
- _txnState.transitionTo(
- ul,
- TransactionState::kCommitted,
- TransactionState::TransitionValidation::kRelaxTransitionValidation);
- }
- }
-
- break;
+ if (activeTxnHistory.transactionCommitted) {
+ _txnState.transitionTo(
+ lg,
+ TransactionState::kCommitted,
+ TransactionState::TransitionValidation::kRelaxTransitionValidation);
}
}
+
+ _isValid = true;
}
void TransactionParticipant::onWriteOpCompletedOnPrimary(
@@ -1723,9 +1723,8 @@ void TransactionParticipant::onMigrateCompletedOnPrimary(OperationContext* opCtx
}
void TransactionParticipant::_invalidate(WithLock) {
- _activeTxnNumber = kUninitializedTxnNumber;
_isValid = false;
- _numInvalidations++;
+ _activeTxnNumber = kUninitializedTxnNumber;
_lastWrittenSessionRecord.reset();
// Reset the transactions metrics.
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 7c493bae0d9..0f41881fd01 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -92,7 +92,6 @@ public:
* Ephemerally holds the Client lock associated with opCtx.
*/
TxnResources(OperationContext* opCtx, bool keepTicket = false);
-
~TxnResources();
// Rule of 5: because we have a class-defined destructor, we need to explictly specify
@@ -283,33 +282,20 @@ public:
* If this session is holding stashed locks in _txnResourceStash, reports the current state of
* the session using the provided builder. Locks the session object's mutex while running.
*/
+ BSONObj reportStashedState() const;
void reportStashedState(BSONObjBuilder* builder) const;
- std::string transactionInfoForLogForTest(const SingleThreadedLockStats* lockStats,
- bool committed,
- repl::ReadConcernArgs readConcernArgs) {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- TransactionState::StateFlag terminationCause =
- committed ? TransactionState::kCommitted : TransactionState::kAborted;
- return _transactionInfoForLog(lockStats, terminationCause, readConcernArgs);
- }
-
/**
* If this session is not holding stashed locks in _txnResourceStash (transaction is active),
* reports the current state of the session using the provided builder. Locks the session
* object's mutex while running.
+ *
* If this is called from a thread other than the owner of the opCtx, that thread must be
* holding the client lock.
*/
void reportUnstashedState(OperationContext* opCtx, BSONObjBuilder* builder) const;
/**
- * Convenience method which creates and populates a BSONObj containing the stashed state.
- * Returns an empty BSONObj if this session has no stashed resources.
- */
- BSONObj reportStashedState() const;
-
- /**
* Aborts the transaction outside the transaction, releasing transaction resources.
*
* Not called with session checked out.
@@ -385,16 +371,6 @@ public:
*/
void beginOrContinueTransactionUnconditionally(TxnNumber txnNumber);
- void transitionToPreparedforTest() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _txnState.transitionTo(lk, TransactionState::kPrepared);
- }
-
- void transitionToAbortedforTest() {
- stdx::lock_guard<stdx::mutex> lk(_mutex);
- _txnState.transitionTo(lk, TransactionState::kAborted);
- }
-
/**
* Blocking method, which loads the transaction state from storage if it has been marked as
* needing refresh.
@@ -402,7 +378,7 @@ public:
* In order to avoid the possibility of deadlock, this method must not be called while holding a
* lock.
*/
- void refreshFromStorageIfNeeded(OperationContext* opCtx);
+ void refreshFromStorageIfNeeded();
TxnNumber getActiveTxnNumber() const {
stdx::lock_guard<stdx::mutex> lg(_mutex);
@@ -500,6 +476,25 @@ public:
*/
bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const;
+ std::string transactionInfoForLogForTest(const SingleThreadedLockStats* lockStats,
+ bool committed,
+ repl::ReadConcernArgs readConcernArgs) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ TransactionState::StateFlag terminationCause =
+ committed ? TransactionState::kCommitted : TransactionState::kAborted;
+ return _transactionInfoForLog(lockStats, terminationCause, readConcernArgs);
+ }
+
+ void transitionToPreparedforTest() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _txnState.transitionTo(lk, TransactionState::kPrepared);
+ }
+
+ void transitionToAbortedforTest() {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+ _txnState.transitionTo(lk, TransactionState::kAborted);
+ }
+
private:
/**
* Reserves a slot in the oplog with an open storage-transaction while it is alive. Reserves the
@@ -619,6 +614,9 @@ private:
// Shortcut to obtain the id of the session under which this participant runs
const LogicalSessionId& _sessionId() const;
+ // Shortcut to obtain the currently checked-out operation context under this participant runs
+ OperationContext* _opCtx() const;
+
/**
* Performing any checks based on the in-memory state of the TransactionParticipant requires
* that the object is fully in sync with its on-disk representation in the transactions table.
@@ -701,7 +699,7 @@ private:
// Checks if the command can be run on this transaction based on the state of the transaction.
void _checkIsCommandValidWithTxnState(WithLock,
- OperationContext* opCtx,
+ const TxnNumber& requestTxnNumber,
const std::string& cmdName);
// Logs the transaction information if it has run slower than the global parameter slowMS. The
@@ -717,7 +715,7 @@ private:
// passed in order for this method to be called.
std::string _transactionInfoForLog(const SingleThreadedLockStats* lockStats,
TransactionState::StateFlag terminationCause,
- repl::ReadConcernArgs readConcernArgs);
+ repl::ReadConcernArgs readConcernArgs) const;
// Reports transaction stats for both active and inactive transactions using the provided
// builder. The lock may be either a lock on _mutex or a lock on _metricsMutex.
@@ -752,8 +750,6 @@ private:
// Protects the member variables below.
mutable stdx::mutex _mutex;
- bool _inShutdown{false};
-
// Holds transaction resources between network operations.
boost::optional<TxnResources> _txnResourceStash;
@@ -790,8 +786,32 @@ private:
// should wait for write concern for on commit.
repl::OpTime _speculativeTransactionReadOpTime;
+ // Contains uncommitted multi-key path info entries which were modified under this transaction
+ // so they can be applied to subsequent opreations before the transaction commits
std::vector<MultikeyPathInfo> _multikeyPathInfo;
+ // Tracks the OpTime of the first oplog entry written by this TransactionParticipant.
+ boost::optional<repl::OpTime> _oldestOplogEntryOpTime;
+
+ // Tracks the OpTime of the abort/commit oplog entry associated with this transaction.
+ boost::optional<repl::OpTime> _finishOpTime;
+
+ // Protects _transactionMetricsObserver. The concurrency rules are that const methods on
+ // _transactionMetricsObserver may be called under either _mutex or _metricsMutex, but for
+ // non-const methods, both mutexes must be held, with _mutex being taken before _metricsMutex.
+ // No other locks, particularly including the Client lock, may be taken while holding
+ // _metricsMutex.
+ mutable stdx::mutex _metricsMutex;
+
+ // Tracks and updates transaction metrics upon the appropriate transaction event.
+ TransactionMetricsObserver _transactionMetricsObserver;
+
+ // Only set if the server is shutting down and it has been ensured that no new requests will be
+ // accepted. Ensures that any transaction resources will not be stashed from the operation
+ // context onto the transaction participant when the session is checked-in so that locks can
+ // automatically get freed.
+ bool _inShutdown{false};
+
//
// Retryable writes state
//
@@ -799,10 +819,6 @@ private:
// Specifies whether the session information needs to be refreshed from storage
bool _isValid{false};
- // Counter, incremented with each call to invalidate in order to discern invalidations, which
- // happen during refresh
- int _numInvalidations{0};
-
// Set to true if incomplete history is detected. For example, when the oplog to a write was
// truncated because it was too old.
bool _hasIncompleteHistory{false};
@@ -814,22 +830,6 @@ private:
// opTime. Used for fast retryability check and retrieving the previous write's data without
// having to scan through the oplog.
CommittedStatementTimestampMap _activeTxnCommittedStatements;
-
- // Protects _transactionMetricsObserver. The concurrency rules are that const methods on
- // _transactionMetricsObserver may be called under either _mutex or _metricsMutex, but for
- // non-const methods, both mutexes must be held, with _mutex being taken before _metricsMutex.
- // No other locks, particularly including the Client lock, may be taken while holding
- // _metricsMutex.
- mutable stdx::mutex _metricsMutex;
-
- // Tracks and updates transaction metrics upon the appropriate transaction event.
- TransactionMetricsObserver _transactionMetricsObserver;
-
- // Tracks the OpTime of the first oplog entry written by this TransactionParticipant.
- boost::optional<repl::OpTime> _oldestOplogEntryOpTime;
-
- // Tracks the OpTime of the abort/commit oplog entry associated with this transaction.
- boost::optional<repl::OpTime> _finishOpTime;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
index 5914862a591..f3900a43b5f 100644
--- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp
+++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp
@@ -145,15 +145,16 @@ protected:
MongoDSessionCatalog::onStepUp(opCtx());
const auto service = opCtx()->getServiceContext();
- OpObserverRegistry* opObserverRegistry =
- dynamic_cast<OpObserverRegistry*>(service->getOpObserver());
- auto mockObserver = stdx::make_unique<OpObserverMock>();
- _opObserver = mockObserver.get();
- opObserverRegistry->addObserver(std::move(mockObserver));
+
+ const auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver());
+ opObserverRegistry->addObserver(stdx::make_unique<OpObserverMock>());
+
+ opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest());
+ _opContextSession.emplace(opCtx());
}
void tearDown() final {
- _opObserver = nullptr;
+ _opContextSession.reset();
MockReplCoordServerFixture::tearDown();
}
@@ -196,16 +197,16 @@ protected:
OplogSlot());
}
- repl::OpTime writeTxnRecord(Session* session,
- TxnNumber txnNum,
+ repl::OpTime writeTxnRecord(TxnNumber txnNum,
StmtId stmtId,
repl::OpTime prevOpTime,
boost::optional<DurableTxnStateEnum> txnState) {
- const auto uuid = UUID::gen();
-
+ const auto session = OperationContextSession::get(opCtx());
const auto txnParticipant = TransactionParticipant::get(session);
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
+ const auto uuid = UUID::gen();
+
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto opTime =
@@ -217,11 +218,12 @@ protected:
return opTime;
}
- void assertTxnRecord(Session* session,
- TxnNumber txnNum,
+ void assertTxnRecord(TxnNumber txnNum,
StmtId stmtId,
repl::OpTime opTime,
boost::optional<DurableTxnStateEnum> txnState) {
+ const auto session = OperationContextSession::get(opCtx());
+
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
{BSON("_id" << session->getSessionId().toBSON())});
@@ -243,23 +245,21 @@ protected:
ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
txnParticipant->invalidate();
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum));
}
- OpObserverMock* _opObserver = nullptr;
+private:
+ boost::optional<OperationContextSession> _opContextSession;
};
TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 20;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
-
- ASSERT_EQ(sessionId, session.getSessionId());
ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull());
DBDirectClient client(opCtx());
@@ -270,15 +270,14 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin)
}
TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrite) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 21;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
- const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none);
+ const auto opTime = writeTxnRecord(txnNum, 0, {}, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
@@ -298,13 +297,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit
TEST_F(TransactionParticipantRetryableWritesTest,
StartingNewerTransactionUpdatesThePersistedSession) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
- const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
- const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none);
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
+
+ const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none);
+ const auto secondOpTime = writeTxnRecord(200, 1, firstOpTime, boost::none);
DBDirectClient client(opCtx());
auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace,
@@ -322,33 +321,27 @@ TEST_F(TransactionParticipantRetryableWritesTest,
ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200));
txnParticipant->invalidate();
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200));
}
TEST_F(TransactionParticipantRetryableWritesTest, TransactionTableUpdatesReplaceEntireDocument) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
-
- const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none);
- assertTxnRecord(&session, 100, 0, firstOpTime, boost::none);
- const auto secondOpTime =
- writeTxnRecord(&session, 200, 1, firstOpTime, DurableTxnStateEnum::kPrepared);
- assertTxnRecord(&session, 200, 1, secondOpTime, DurableTxnStateEnum::kPrepared);
- const auto thirdOpTime =
- writeTxnRecord(&session, 300, 2, secondOpTime, DurableTxnStateEnum::kCommitted);
- assertTxnRecord(&session, 300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted);
- const auto fourthOpTime = writeTxnRecord(&session, 400, 3, thirdOpTime, boost::none);
- assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none);
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+
+ const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none);
+ assertTxnRecord(100, 0, firstOpTime, boost::none);
+ const auto secondOpTime = writeTxnRecord(200, 1, firstOpTime, DurableTxnStateEnum::kPrepared);
+ assertTxnRecord(200, 1, secondOpTime, DurableTxnStateEnum::kPrepared);
+ const auto thirdOpTime = writeTxnRecord(300, 2, secondOpTime, DurableTxnStateEnum::kCommitted);
+ assertTxnRecord(300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted);
+ const auto fourthOpTime = writeTxnRecord(400, 3, thirdOpTime, boost::none);
+ assertTxnRecord(400, 3, fourthOpTime, boost::none);
}
TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 20;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -360,10 +353,10 @@ TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) {
}
TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionNotDefaultCreated) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
// Drop the transactions table
BSONObj dropResult;
@@ -385,29 +378,27 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN
}
TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
- const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none);
+ const auto firstOpTime = writeTxnRecord(txnNum, 1000, {}, boost::none);
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000));
ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000));
ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
- writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none);
+ writeTxnRecord(txnNum, 2000, firstOpTime, boost::none);
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000));
ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000));
// Invalidate the session and ensure the statements still check out
txnParticipant->invalidate();
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000));
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000));
@@ -417,10 +408,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) {
}
TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTransactionThrows) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -432,9 +421,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTr
TEST_F(TransactionParticipantRetryableWritesTest,
CheckStatementExecutedForInvalidatedTransactionThrows) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
txnParticipant->invalidate();
ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), 100, 0),
@@ -444,11 +431,10 @@ TEST_F(TransactionParticipantRetryableWritesTest,
TEST_F(TransactionParticipantRetryableWritesTest,
WriteOpCompletedOnPrimaryForOldTransactionThrows) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+ const auto& sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -476,10 +462,8 @@ TEST_F(TransactionParticipantRetryableWritesTest,
TEST_F(TransactionParticipantRetryableWritesTest,
WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -487,7 +471,7 @@ TEST_F(TransactionParticipantRetryableWritesTest,
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto uuid = UUID::gen();
- const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
+ const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0);
txnParticipant->invalidate();
@@ -499,10 +483,8 @@ TEST_F(TransactionParticipantRetryableWritesTest,
TEST_F(TransactionParticipantRetryableWritesTest,
WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) {
- const auto sessionId = makeLogicalSessionIdForTest();
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
const TxnNumber txnNum = 100;
txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
@@ -511,7 +493,7 @@ TEST_F(TransactionParticipantRetryableWritesTest,
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
const auto uuid = UUID::gen();
- const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0);
+ const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0);
txnParticipant->onWriteOpCompletedOnPrimary(
opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none);
@@ -520,12 +502,12 @@ TEST_F(TransactionParticipantRetryableWritesTest,
wuow.commit();
}
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0));
}
TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) {
- const auto sessionId = makeLogicalSessionIdForTest();
+ const auto sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 2;
{
@@ -576,9 +558,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru
}());
}
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0),
AssertionException,
@@ -595,18 +576,17 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru
TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) {
const auto uuid = UUID::gen();
- const auto sessionId = makeLogicalSessionIdForTest();
+ const auto sessionId = *opCtx()->getLogicalSessionId();
const TxnNumber txnNum = 2;
+ const auto txnParticipant = TransactionParticipant::get(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
+ txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
+
OperationSessionInfo osi;
osi.setSessionId(sessionId);
osi.setTxnNumber(txnNum);
- Session session(sessionId);
- const auto txnParticipant = TransactionParticipant::get(&session);
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
- txnParticipant->beginOrContinue(txnNum, boost::none, boost::none);
-
auto firstOpTime = ([&]() {
AutoGetCollection autoColl(opCtx(), kNss, MODE_IX);
WriteUnitOfWork wuow(opCtx());
@@ -671,7 +651,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke
// Should have the same behavior after loading state from storage.
txnParticipant->invalidate();
- txnParticipant->refreshFromStorageIfNeeded(opCtx());
+ txnParticipant->refreshFromStorageIfNeeded();
{
auto oplog = txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1);
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index b067207d90a..bc85d8e36a8 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -2677,8 +2677,8 @@ TEST_F(TransactionsMetricsTest, ReportUnstashedResourcesForARetryableWrite) {
txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), boost::none, boost::none);
txnParticipant->unstashTransactionResources(opCtx(), "find");
- // Build a BSONObj containing the details which we expect to see reported when we call
- // Session::reportUnstashedState. For a retryable write, we should only include the txnNumber.
+ // Build a BSONObj containing the details which we expect to see reported when we invoke
+ // reportUnstashedState. For a retryable write, we should only include the txnNumber.
BSONObjBuilder reportBuilder;
BSONObjBuilder transactionBuilder(reportBuilder.subobjStart("transaction"));
BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters"));