diff options
-rw-r--r-- | src/mongo/db/op_observer_impl_test.cpp | 4 | ||||
-rw-r--r-- | src/mongo/db/operation_context.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/operation_context.h | 11 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/s/session_catalog_migration_destination_test.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/service_entry_point_common.cpp | 7 | ||||
-rw-r--r-- | src/mongo/db/session_catalog_mongod.cpp | 2 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 73 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 102 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_retryable_writes_test.cpp | 170 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 4 |
11 files changed, 180 insertions, 199 deletions
diff --git a/src/mongo/db/op_observer_impl_test.cpp b/src/mongo/db/op_observer_impl_test.cpp index 7110ff5aeb8..1fb9711b7c2 100644 --- a/src/mongo/db/op_observer_impl_test.cpp +++ b/src/mongo/db/op_observer_impl_test.cpp @@ -349,7 +349,7 @@ TEST_F(OpObserverSessionCatalogRollbackTest, // Create a session and sync it from disk auto session = sessionCatalog->checkOutSession(opCtx.get()); const auto txnParticipant = TransactionParticipant::get(session.get()); - txnParticipant->refreshFromStorageIfNeeded(opCtx.get()); + txnParticipant->refreshFromStorageIfNeeded(); // Simulate a write occurring on that session simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId); @@ -398,7 +398,7 @@ TEST_F(OpObserverSessionCatalogRollbackTest, // Create a session and sync it from disk auto session = sessionCatalog->checkOutSession(opCtx.get()); const auto txnParticipant = TransactionParticipant::get(session.get()); - txnParticipant->refreshFromStorageIfNeeded(opCtx.get()); + txnParticipant->refreshFromStorageIfNeeded(); // Simulate a write occurring on that session simulateSessionWrite(opCtx.get(), txnParticipant, nss, txnNum, stmtId); diff --git a/src/mongo/db/operation_context.cpp b/src/mongo/db/operation_context.cpp index 833eb145938..88af8855212 100644 --- a/src/mongo/db/operation_context.cpp +++ b/src/mongo/db/operation_context.cpp @@ -85,6 +85,8 @@ OperationContext::OperationContext(Client* client, unsigned int opId) _elapsedTime(client ? client->getServiceContext()->getTickSource() : SystemTickSource::get()) {} +OperationContext::~OperationContext() = default; + void OperationContext::setDeadlineAndMaxTime(Date_t when, Microseconds maxTime, ErrorCodes::Error timeoutError) { diff --git a/src/mongo/db/operation_context.h b/src/mongo/db/operation_context.h index 92845652984..df8694b0e05 100644 --- a/src/mongo/db/operation_context.h +++ b/src/mongo/db/operation_context.h @@ -79,8 +79,7 @@ class OperationContext : public Interruptible, public Decorable<OperationContext public: OperationContext(Client* client, unsigned int opId); - - virtual ~OperationContext() = default; + virtual ~OperationContext(); /** * Interface for durability. Caller DOES NOT own pointer. @@ -164,7 +163,7 @@ public: /** * Returns the session ID associated with this operation, if there is one. */ - boost::optional<LogicalSessionId> getLogicalSessionId() const { + const boost::optional<LogicalSessionId>& getLogicalSessionId() const { return _lsid; } @@ -407,7 +406,9 @@ private: friend class WriteUnitOfWork; friend class repl::UnreplicatedWritesBlock; + Client* const _client; + const unsigned int _opId; boost::optional<LogicalSessionId> _lsid; @@ -447,8 +448,8 @@ private: WriteConcernOptions _writeConcern; - Date_t _deadline = - Date_t::max(); // The timepoint at which this operation exceeds its time limit. + // The timepoint at which this operation exceeds its time limit. + Date_t _deadline = Date_t::max(); ErrorCodes::Error _timeoutError = ErrorCodes::ExceededTimeLimit; bool _ignoreInterrupts = false; diff --git a/src/mongo/db/s/session_catalog_migration_destination.cpp b/src/mongo/db/s/session_catalog_migration_destination.cpp index f6a75f6f772..036e319c59e 100644 --- a/src/mongo/db/s/session_catalog_migration_destination.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination.cpp @@ -249,7 +249,7 @@ ProcessOplogResult processSessionOplog(OperationContext* opCtx, auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, result.sessionId); auto const txnParticipant = TransactionParticipant::get(scopedSession.get()); - txnParticipant->refreshFromStorageIfNeeded(opCtx); + txnParticipant->refreshFromStorageIfNeeded(); if (!txnParticipant->onMigrateBeginOnPrimary(opCtx, result.txnNum, stmtId)) { // Don't continue migrating the transaction history diff --git a/src/mongo/db/s/session_catalog_migration_destination_test.cpp b/src/mongo/db/s/session_catalog_migration_destination_test.cpp index 75f4f390fe4..ffc5cd400f1 100644 --- a/src/mongo/db/s/session_catalog_migration_destination_test.cpp +++ b/src/mongo/db/s/session_catalog_migration_destination_test.cpp @@ -1589,7 +1589,7 @@ TEST_F(SessionCatalogMigrationDestinationTest, auto scopedSession = SessionCatalog::get(opCtx)->checkOutSession(opCtx, *sessionInfo1.getSessionId()); const auto txnParticipant = TransactionParticipant::get(scopedSession.get()); - txnParticipant->refreshFromStorageIfNeeded(opCtx); + txnParticipant->refreshFromStorageIfNeeded(); txnParticipant->beginOrContinue(3, boost::none, boost::none); } diff --git a/src/mongo/db/service_entry_point_common.cpp b/src/mongo/db/service_entry_point_common.cpp index 380e7784bdb..e365b2bdafe 100644 --- a/src/mongo/db/service_entry_point_common.cpp +++ b/src/mongo/db/service_entry_point_common.cpp @@ -370,17 +370,16 @@ void invokeWithSessionCheckedOut(OperationContext* opCtx, if (sessionOptions.getCoordinator() == boost::optional<bool>(true)) { createTransactionCoordinator(opCtx, *sessionOptions.getTxnNumber()); } - } - - if (txnParticipant->inMultiDocumentTransaction() && !sessionOptions.getStartTransaction()) { + } else if (txnParticipant->inMultiDocumentTransaction()) { const auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); uassert(ErrorCodes::InvalidOptions, "Only the first command in a transaction may specify a readConcern", readConcernArgs.isEmpty()); } + + txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName()); } - txnParticipant->unstashTransactionResources(opCtx, invocation->definition()->getName()); ScopeGuard guard = MakeGuard([&txnParticipant, opCtx]() { txnParticipant->abortActiveUnpreparedOrStashPreparedTransaction(opCtx); }); diff --git a/src/mongo/db/session_catalog_mongod.cpp b/src/mongo/db/session_catalog_mongod.cpp index 9ef31bc588b..c2ef072d72a 100644 --- a/src/mongo/db/session_catalog_mongod.cpp +++ b/src/mongo/db/session_catalog_mongod.cpp @@ -194,7 +194,7 @@ MongoDOperationContextSession::MongoDOperationContextSession(OperationContext* o : _operationContextSession(opCtx) { if (!opCtx->getClient()->isInDirectClient()) { const auto txnParticipant = TransactionParticipant::get(opCtx); - txnParticipant->refreshFromStorageIfNeeded(opCtx); + txnParticipant->refreshFromStorageIfNeeded(); } } diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 5f67b995cf7..5929c41f752 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -306,6 +306,13 @@ const LogicalSessionId& TransactionParticipant::_sessionId() const { return owningSession->getSessionId(); } +OperationContext* TransactionParticipant::_opCtx() const { + const auto* owningSession = getTransactionParticipant.owner(this); + auto* opCtx = owningSession->currentOperation(); + invariant(opCtx); + return opCtx; +} + void TransactionParticipant::_beginOrContinueRetryableWrite(WithLock wl, TxnNumber txnNumber) { if (txnNumber > _activeTxnNumber) { // New retryable write. @@ -697,7 +704,7 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx return; } - _checkIsCommandValidWithTxnState(lg, opCtx, cmdName); + _checkIsCommandValidWithTxnState(lg, *opCtx->getTxnNumber(), cmdName); if (_txnResourceStash) { // Transaction resources already exist for this transaction. Transfer them from the @@ -713,6 +720,7 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx // If we have no transaction resources then we cannot be prepared. If we're not in progress, // we don't do anything else. invariant(!_txnState.isPrepared(lg)); + if (!_txnState.isInProgress(lg)) { // At this point we're either committed and this is a 'commitTransaction' command, or we // are in the process of committing. @@ -1115,6 +1123,7 @@ bool TransactionParticipant::expired() const { void TransactionParticipant::abortActiveTransaction(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lock(_mutex); + // This function shouldn't throw if the transaction is already aborted. _checkIsActiveTransaction(lock, *opCtx->getTxnNumber(), false); _abortActiveTransaction( @@ -1186,9 +1195,11 @@ void TransactionParticipant::_abortActiveTransaction(stdx::unique_lock<stdx::mut // unlock the session to run the opObserver onTransactionAbort, which calls back into the // session. lock.unlock(); + auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); opObserver->onTransactionAbort(opCtx, abortOplogSlot); + lock.lock(); // We do not check if the active transaction number is correct here because we handle it below. @@ -1297,17 +1308,17 @@ void TransactionParticipant::_checkIsActiveTransaction(WithLock wl, } void TransactionParticipant::_checkIsCommandValidWithTxnState(WithLock wl, - OperationContext* opCtx, + const TxnNumber& requestTxnNumber, const std::string& cmdName) { // Throw NoSuchTransaction error instead of TransactionAborted error since this is the entry // point of transaction execution. uassert(ErrorCodes::NoSuchTransaction, - str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been aborted.", + str::stream() << "Transaction " << requestTxnNumber << " has been aborted.", !_txnState.isAborted(wl)); // Cannot change committed transaction but allow retrying commitTransaction command. uassert(ErrorCodes::TransactionCommitted, - str::stream() << "Transaction " << *opCtx->getTxnNumber() << " has been committed.", + str::stream() << "Transaction " << requestTxnNumber << " has been committed.", cmdName == "commitTransaction" || !_txnState.isCommitted(wl)); // Disallow operations other than abort, prepare or commit on a prepared transaction @@ -1486,7 +1497,7 @@ void TransactionParticipant::_reportTransactionStats(WithLock wl, std::string TransactionParticipant::_transactionInfoForLog( const SingleThreadedLockStats* lockStats, TransactionState::StateFlag terminationCause, - repl::ReadConcernArgs readConcernArgs) { + repl::ReadConcernArgs readConcernArgs) const { invariant(lockStats); invariant(terminationCause == TransactionState::kCommitted || terminationCause == TransactionState::kAborted); @@ -1596,45 +1607,34 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN _transactionMetricsObserver.resetSingleTransactionStats(txnNumber); } -void TransactionParticipant::refreshFromStorageIfNeeded(OperationContext* opCtx) { - if (opCtx->getClient()->isInDirectClient()) { - return; - } - +void TransactionParticipant::refreshFromStorageIfNeeded() { + const auto opCtx = _opCtx(); + invariant(!opCtx->getClient()->isInDirectClient()); invariant(!opCtx->lockState()->isLocked()); - stdx::unique_lock<stdx::mutex> ul(_mutex); - - while (!_isValid) { - const int numInvalidations = _numInvalidations; - - ul.unlock(); + if (_isValid) + return; - auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId()); + auto activeTxnHistory = fetchActiveTransactionHistory(opCtx, _sessionId()); - ul.lock(); + stdx::lock_guard<stdx::mutex> lg(_mutex); - // Protect against concurrent refreshes or invalidations - if (!_isValid && _numInvalidations == numInvalidations) { - _isValid = true; - _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord); + _lastWrittenSessionRecord = std::move(activeTxnHistory.lastTxnRecord); - if (_lastWrittenSessionRecord) { - _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum(); - _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements); - _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory; + if (_lastWrittenSessionRecord) { + _activeTxnNumber = _lastWrittenSessionRecord->getTxnNum(); + _activeTxnCommittedStatements = std::move(activeTxnHistory.committedStatements); + _hasIncompleteHistory = activeTxnHistory.hasIncompleteHistory; - if (activeTxnHistory.transactionCommitted) { - _txnState.transitionTo( - ul, - TransactionState::kCommitted, - TransactionState::TransitionValidation::kRelaxTransitionValidation); - } - } - - break; + if (activeTxnHistory.transactionCommitted) { + _txnState.transitionTo( + lg, + TransactionState::kCommitted, + TransactionState::TransitionValidation::kRelaxTransitionValidation); } } + + _isValid = true; } void TransactionParticipant::onWriteOpCompletedOnPrimary( @@ -1723,9 +1723,8 @@ void TransactionParticipant::onMigrateCompletedOnPrimary(OperationContext* opCtx } void TransactionParticipant::_invalidate(WithLock) { - _activeTxnNumber = kUninitializedTxnNumber; _isValid = false; - _numInvalidations++; + _activeTxnNumber = kUninitializedTxnNumber; _lastWrittenSessionRecord.reset(); // Reset the transactions metrics. diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 7c493bae0d9..0f41881fd01 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -92,7 +92,6 @@ public: * Ephemerally holds the Client lock associated with opCtx. */ TxnResources(OperationContext* opCtx, bool keepTicket = false); - ~TxnResources(); // Rule of 5: because we have a class-defined destructor, we need to explictly specify @@ -283,33 +282,20 @@ public: * If this session is holding stashed locks in _txnResourceStash, reports the current state of * the session using the provided builder. Locks the session object's mutex while running. */ + BSONObj reportStashedState() const; void reportStashedState(BSONObjBuilder* builder) const; - std::string transactionInfoForLogForTest(const SingleThreadedLockStats* lockStats, - bool committed, - repl::ReadConcernArgs readConcernArgs) { - stdx::lock_guard<stdx::mutex> lk(_mutex); - TransactionState::StateFlag terminationCause = - committed ? TransactionState::kCommitted : TransactionState::kAborted; - return _transactionInfoForLog(lockStats, terminationCause, readConcernArgs); - } - /** * If this session is not holding stashed locks in _txnResourceStash (transaction is active), * reports the current state of the session using the provided builder. Locks the session * object's mutex while running. + * * If this is called from a thread other than the owner of the opCtx, that thread must be * holding the client lock. */ void reportUnstashedState(OperationContext* opCtx, BSONObjBuilder* builder) const; /** - * Convenience method which creates and populates a BSONObj containing the stashed state. - * Returns an empty BSONObj if this session has no stashed resources. - */ - BSONObj reportStashedState() const; - - /** * Aborts the transaction outside the transaction, releasing transaction resources. * * Not called with session checked out. @@ -385,16 +371,6 @@ public: */ void beginOrContinueTransactionUnconditionally(TxnNumber txnNumber); - void transitionToPreparedforTest() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _txnState.transitionTo(lk, TransactionState::kPrepared); - } - - void transitionToAbortedforTest() { - stdx::lock_guard<stdx::mutex> lk(_mutex); - _txnState.transitionTo(lk, TransactionState::kAborted); - } - /** * Blocking method, which loads the transaction state from storage if it has been marked as * needing refresh. @@ -402,7 +378,7 @@ public: * In order to avoid the possibility of deadlock, this method must not be called while holding a * lock. */ - void refreshFromStorageIfNeeded(OperationContext* opCtx); + void refreshFromStorageIfNeeded(); TxnNumber getActiveTxnNumber() const { stdx::lock_guard<stdx::mutex> lg(_mutex); @@ -500,6 +476,25 @@ public: */ bool checkStatementExecutedNoOplogEntryFetch(TxnNumber txnNumber, StmtId stmtId) const; + std::string transactionInfoForLogForTest(const SingleThreadedLockStats* lockStats, + bool committed, + repl::ReadConcernArgs readConcernArgs) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + TransactionState::StateFlag terminationCause = + committed ? TransactionState::kCommitted : TransactionState::kAborted; + return _transactionInfoForLog(lockStats, terminationCause, readConcernArgs); + } + + void transitionToPreparedforTest() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _txnState.transitionTo(lk, TransactionState::kPrepared); + } + + void transitionToAbortedforTest() { + stdx::lock_guard<stdx::mutex> lk(_mutex); + _txnState.transitionTo(lk, TransactionState::kAborted); + } + private: /** * Reserves a slot in the oplog with an open storage-transaction while it is alive. Reserves the @@ -619,6 +614,9 @@ private: // Shortcut to obtain the id of the session under which this participant runs const LogicalSessionId& _sessionId() const; + // Shortcut to obtain the currently checked-out operation context under this participant runs + OperationContext* _opCtx() const; + /** * Performing any checks based on the in-memory state of the TransactionParticipant requires * that the object is fully in sync with its on-disk representation in the transactions table. @@ -701,7 +699,7 @@ private: // Checks if the command can be run on this transaction based on the state of the transaction. void _checkIsCommandValidWithTxnState(WithLock, - OperationContext* opCtx, + const TxnNumber& requestTxnNumber, const std::string& cmdName); // Logs the transaction information if it has run slower than the global parameter slowMS. The @@ -717,7 +715,7 @@ private: // passed in order for this method to be called. std::string _transactionInfoForLog(const SingleThreadedLockStats* lockStats, TransactionState::StateFlag terminationCause, - repl::ReadConcernArgs readConcernArgs); + repl::ReadConcernArgs readConcernArgs) const; // Reports transaction stats for both active and inactive transactions using the provided // builder. The lock may be either a lock on _mutex or a lock on _metricsMutex. @@ -752,8 +750,6 @@ private: // Protects the member variables below. mutable stdx::mutex _mutex; - bool _inShutdown{false}; - // Holds transaction resources between network operations. boost::optional<TxnResources> _txnResourceStash; @@ -790,8 +786,32 @@ private: // should wait for write concern for on commit. repl::OpTime _speculativeTransactionReadOpTime; + // Contains uncommitted multi-key path info entries which were modified under this transaction + // so they can be applied to subsequent opreations before the transaction commits std::vector<MultikeyPathInfo> _multikeyPathInfo; + // Tracks the OpTime of the first oplog entry written by this TransactionParticipant. + boost::optional<repl::OpTime> _oldestOplogEntryOpTime; + + // Tracks the OpTime of the abort/commit oplog entry associated with this transaction. + boost::optional<repl::OpTime> _finishOpTime; + + // Protects _transactionMetricsObserver. The concurrency rules are that const methods on + // _transactionMetricsObserver may be called under either _mutex or _metricsMutex, but for + // non-const methods, both mutexes must be held, with _mutex being taken before _metricsMutex. + // No other locks, particularly including the Client lock, may be taken while holding + // _metricsMutex. + mutable stdx::mutex _metricsMutex; + + // Tracks and updates transaction metrics upon the appropriate transaction event. + TransactionMetricsObserver _transactionMetricsObserver; + + // Only set if the server is shutting down and it has been ensured that no new requests will be + // accepted. Ensures that any transaction resources will not be stashed from the operation + // context onto the transaction participant when the session is checked-in so that locks can + // automatically get freed. + bool _inShutdown{false}; + // // Retryable writes state // @@ -799,10 +819,6 @@ private: // Specifies whether the session information needs to be refreshed from storage bool _isValid{false}; - // Counter, incremented with each call to invalidate in order to discern invalidations, which - // happen during refresh - int _numInvalidations{0}; - // Set to true if incomplete history is detected. For example, when the oplog to a write was // truncated because it was too old. bool _hasIncompleteHistory{false}; @@ -814,22 +830,6 @@ private: // opTime. Used for fast retryability check and retrieving the previous write's data without // having to scan through the oplog. CommittedStatementTimestampMap _activeTxnCommittedStatements; - - // Protects _transactionMetricsObserver. The concurrency rules are that const methods on - // _transactionMetricsObserver may be called under either _mutex or _metricsMutex, but for - // non-const methods, both mutexes must be held, with _mutex being taken before _metricsMutex. - // No other locks, particularly including the Client lock, may be taken while holding - // _metricsMutex. - mutable stdx::mutex _metricsMutex; - - // Tracks and updates transaction metrics upon the appropriate transaction event. - TransactionMetricsObserver _transactionMetricsObserver; - - // Tracks the OpTime of the first oplog entry written by this TransactionParticipant. - boost::optional<repl::OpTime> _oldestOplogEntryOpTime; - - // Tracks the OpTime of the abort/commit oplog entry associated with this transaction. - boost::optional<repl::OpTime> _finishOpTime; }; } // namespace mongo diff --git a/src/mongo/db/transaction_participant_retryable_writes_test.cpp b/src/mongo/db/transaction_participant_retryable_writes_test.cpp index 5914862a591..f3900a43b5f 100644 --- a/src/mongo/db/transaction_participant_retryable_writes_test.cpp +++ b/src/mongo/db/transaction_participant_retryable_writes_test.cpp @@ -145,15 +145,16 @@ protected: MongoDSessionCatalog::onStepUp(opCtx()); const auto service = opCtx()->getServiceContext(); - OpObserverRegistry* opObserverRegistry = - dynamic_cast<OpObserverRegistry*>(service->getOpObserver()); - auto mockObserver = stdx::make_unique<OpObserverMock>(); - _opObserver = mockObserver.get(); - opObserverRegistry->addObserver(std::move(mockObserver)); + + const auto opObserverRegistry = dynamic_cast<OpObserverRegistry*>(service->getOpObserver()); + opObserverRegistry->addObserver(stdx::make_unique<OpObserverMock>()); + + opCtx()->setLogicalSessionId(makeLogicalSessionIdForTest()); + _opContextSession.emplace(opCtx()); } void tearDown() final { - _opObserver = nullptr; + _opContextSession.reset(); MockReplCoordServerFixture::tearDown(); } @@ -196,16 +197,16 @@ protected: OplogSlot()); } - repl::OpTime writeTxnRecord(Session* session, - TxnNumber txnNum, + repl::OpTime writeTxnRecord(TxnNumber txnNum, StmtId stmtId, repl::OpTime prevOpTime, boost::optional<DurableTxnStateEnum> txnState) { - const auto uuid = UUID::gen(); - + const auto session = OperationContextSession::get(opCtx()); const auto txnParticipant = TransactionParticipant::get(session); txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); + const auto uuid = UUID::gen(); + AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto opTime = @@ -217,11 +218,12 @@ protected: return opTime; } - void assertTxnRecord(Session* session, - TxnNumber txnNum, + void assertTxnRecord(TxnNumber txnNum, StmtId stmtId, repl::OpTime opTime, boost::optional<DurableTxnStateEnum> txnState) { + const auto session = OperationContextSession::get(opCtx()); + DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, {BSON("_id" << session->getSessionId().toBSON())}); @@ -243,23 +245,21 @@ protected: ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); txnParticipant->invalidate(); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT_EQ(opTime, txnParticipant->getLastWriteOpTime(txnNum)); } - OpObserverMock* _opObserver = nullptr; +private: + boost::optional<OperationContextSession> _opContextSession; }; TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto& sessionId = *opCtx()->getLogicalSessionId(); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 20; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - - ASSERT_EQ(sessionId, session.getSessionId()); ASSERT(txnParticipant->getLastWriteOpTime(txnNum).isNull()); DBDirectClient client(opCtx()); @@ -270,15 +270,14 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryNotWrittenOnBegin) } TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrite) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 21; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - const auto opTime = writeTxnRecord(&session, txnNum, 0, {}, boost::none); + const auto opTime = writeTxnRecord(txnNum, 0, {}, boost::none); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, @@ -298,13 +297,13 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionEntryWrittenAtFirstWrit TEST_F(TransactionParticipantRetryableWritesTest, StartingNewerTransactionUpdatesThePersistedSession) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); - const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); - const auto secondOpTime = writeTxnRecord(&session, 200, 1, firstOpTime, boost::none); + const auto& sessionId = *opCtx()->getLogicalSessionId(); + + const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none); + const auto secondOpTime = writeTxnRecord(200, 1, firstOpTime, boost::none); DBDirectClient client(opCtx()); auto cursor = client.query(NamespaceString::kSessionTransactionsTableNamespace, @@ -322,33 +321,27 @@ TEST_F(TransactionParticipantRetryableWritesTest, ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200)); txnParticipant->invalidate(); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT_EQ(secondOpTime, txnParticipant->getLastWriteOpTime(200)); } TEST_F(TransactionParticipantRetryableWritesTest, TransactionTableUpdatesReplaceEntireDocument) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); - - const auto firstOpTime = writeTxnRecord(&session, 100, 0, {}, boost::none); - assertTxnRecord(&session, 100, 0, firstOpTime, boost::none); - const auto secondOpTime = - writeTxnRecord(&session, 200, 1, firstOpTime, DurableTxnStateEnum::kPrepared); - assertTxnRecord(&session, 200, 1, secondOpTime, DurableTxnStateEnum::kPrepared); - const auto thirdOpTime = - writeTxnRecord(&session, 300, 2, secondOpTime, DurableTxnStateEnum::kCommitted); - assertTxnRecord(&session, 300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted); - const auto fourthOpTime = writeTxnRecord(&session, 400, 3, thirdOpTime, boost::none); - assertTxnRecord(&session, 400, 3, fourthOpTime, boost::none); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + + const auto firstOpTime = writeTxnRecord(100, 0, {}, boost::none); + assertTxnRecord(100, 0, firstOpTime, boost::none); + const auto secondOpTime = writeTxnRecord(200, 1, firstOpTime, DurableTxnStateEnum::kPrepared); + assertTxnRecord(200, 1, secondOpTime, DurableTxnStateEnum::kPrepared); + const auto thirdOpTime = writeTxnRecord(300, 2, secondOpTime, DurableTxnStateEnum::kCommitted); + assertTxnRecord(300, 2, thirdOpTime, DurableTxnStateEnum::kCommitted); + const auto fourthOpTime = writeTxnRecord(400, 3, thirdOpTime, boost::none); + assertTxnRecord(400, 3, fourthOpTime, boost::none); } TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 20; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -360,10 +353,10 @@ TEST_F(TransactionParticipantRetryableWritesTest, StartingOldTxnShouldAssert) { } TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionNotDefaultCreated) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + + const auto& sessionId = *opCtx()->getLogicalSessionId(); // Drop the transactions table BSONObj dropResult; @@ -385,29 +378,27 @@ TEST_F(TransactionParticipantRetryableWritesTest, SessionTransactionsCollectionN } TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); - const auto firstOpTime = writeTxnRecord(&session, txnNum, 1000, {}, boost::none); + const auto firstOpTime = writeTxnRecord(txnNum, 1000, {}, boost::none); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 1000)); ASSERT(!txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(!txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); - writeTxnRecord(&session, txnNum, 2000, firstOpTime, boost::none); + writeTxnRecord(txnNum, 2000, firstOpTime, boost::none); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000)); ASSERT(txnParticipant->checkStatementExecutedNoOplogEntryFetch(txnNum, 2000)); // Invalidate the session and ensure the statements still check out txnParticipant->invalidate(); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1000)); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 2000)); @@ -417,10 +408,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecuted) { } TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTransactionThrows) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -432,9 +421,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForOldTr TEST_F(TransactionParticipantRetryableWritesTest, CheckStatementExecutedForInvalidatedTransactionThrows) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); + const auto txnParticipant = TransactionParticipant::get(opCtx()); txnParticipant->invalidate(); ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), 100, 0), @@ -444,11 +431,10 @@ TEST_F(TransactionParticipantRetryableWritesTest, TEST_F(TransactionParticipantRetryableWritesTest, WriteOpCompletedOnPrimaryForOldTransactionThrows) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + const auto& sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -476,10 +462,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, TEST_F(TransactionParticipantRetryableWritesTest, WriteOpCompletedOnPrimaryForInvalidatedTransactionThrows) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -487,7 +471,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto uuid = UUID::gen(); - const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); + const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0); txnParticipant->invalidate(); @@ -499,10 +483,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, TEST_F(TransactionParticipantRetryableWritesTest, WriteOpCompletedOnPrimaryCommitIgnoresInvalidation) { - const auto sessionId = makeLogicalSessionIdForTest(); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); const TxnNumber txnNum = 100; txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); @@ -511,7 +493,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); const auto uuid = UUID::gen(); - const auto opTime = logOp(opCtx(), kNss, uuid, sessionId, txnNum, 0); + const auto opTime = logOp(opCtx(), kNss, uuid, *opCtx()->getLogicalSessionId(), txnNum, 0); txnParticipant->onWriteOpCompletedOnPrimary( opCtx(), txnNum, {0}, opTime, Date_t::now(), boost::none); @@ -520,12 +502,12 @@ TEST_F(TransactionParticipantRetryableWritesTest, wuow.commit(); } - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0)); } TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTruncation) { - const auto sessionId = makeLogicalSessionIdForTest(); + const auto sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 2; { @@ -576,9 +558,8 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru }()); } - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); ASSERT_THROWS_CODE(txnParticipant->checkStatementExecuted(opCtx(), txnNum, 0), AssertionException, @@ -595,18 +576,17 @@ TEST_F(TransactionParticipantRetryableWritesTest, IncompleteHistoryDueToOpLogTru TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingCheckedIsNotInCache) { const auto uuid = UUID::gen(); - const auto sessionId = makeLogicalSessionIdForTest(); + const auto sessionId = *opCtx()->getLogicalSessionId(); const TxnNumber txnNum = 2; + const auto txnParticipant = TransactionParticipant::get(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); + txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); + OperationSessionInfo osi; osi.setSessionId(sessionId); osi.setTxnNumber(txnNum); - Session session(sessionId); - const auto txnParticipant = TransactionParticipant::get(&session); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); - txnParticipant->beginOrContinue(txnNum, boost::none, boost::none); - auto firstOpTime = ([&]() { AutoGetCollection autoColl(opCtx(), kNss, MODE_IX); WriteUnitOfWork wuow(opCtx()); @@ -671,7 +651,7 @@ TEST_F(TransactionParticipantRetryableWritesTest, ErrorOnlyWhenStmtIdBeingChecke // Should have the same behavior after loading state from storage. txnParticipant->invalidate(); - txnParticipant->refreshFromStorageIfNeeded(opCtx()); + txnParticipant->refreshFromStorageIfNeeded(); { auto oplog = txnParticipant->checkStatementExecuted(opCtx(), txnNum, 1); diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index b067207d90a..bc85d8e36a8 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -2677,8 +2677,8 @@ TEST_F(TransactionsMetricsTest, ReportUnstashedResourcesForARetryableWrite) { txnParticipant->beginOrContinue(*opCtx()->getTxnNumber(), boost::none, boost::none); txnParticipant->unstashTransactionResources(opCtx(), "find"); - // Build a BSONObj containing the details which we expect to see reported when we call - // Session::reportUnstashedState. For a retryable write, we should only include the txnNumber. + // Build a BSONObj containing the details which we expect to see reported when we invoke + // reportUnstashedState. For a retryable write, we should only include the txnNumber. BSONObjBuilder reportBuilder; BSONObjBuilder transactionBuilder(reportBuilder.subobjStart("transaction")); BSONObjBuilder parametersBuilder(transactionBuilder.subobjStart("parameters")); |