diff options
Diffstat (limited to 'src/mongo/db/transaction_participant.cpp')
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 43 |
1 files changed, 36 insertions, 7 deletions
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 7a5d0e812e8..8f710669788 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -928,7 +928,8 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, // We need to unlock the session to run the opObserver onTransactionPrepare, which calls back // into the session. lk.unlock(); - opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot); + opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare( + opCtx, prepareOplogSlot, retrieveCompletedTransactionOperations(opCtx)); abortGuard.dismiss(); @@ -992,7 +993,7 @@ void TransactionParticipant::addTransactionOperation(OperationContext* opCtx, _transactionOperationBytes <= BSONObjMaxInternalSize); } -std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrieveOperations( +std::vector<repl::ReplOperation>& TransactionParticipant::retrieveCompletedTransactionOperations( OperationContext* opCtx) { stdx::lock_guard<stdx::mutex> lk(_mutex); @@ -1000,13 +1001,32 @@ std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrie // and migration, which do not check out the session. _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); - // Ensure that we only ever end a transaction when prepared or in progress. - invariant(_txnState.isInSet(lk, TransactionState::kPrepared | TransactionState::kInProgress), + // Ensure that we only ever retrieve a transaction's completed operations when in progress, + // committing with prepare, or prepared. + invariant(_txnState.isInSet(lk, + TransactionState::kInProgress | + TransactionState::kCommittingWithPrepare | + TransactionState::kPrepared), + str::stream() << "Current state: " << _txnState); + + return _transactionOperations; +} + +void TransactionParticipant::clearOperationsInMemory(OperationContext* opCtx) { + stdx::lock_guard<stdx::mutex> lk(_mutex); + + // Always check session's txnNumber and '_txnState', since they can be modified by session kill + // and migration, which do not check out the session. + _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); + + // Ensure that we only ever end a transaction when committing with prepare or in progress. + invariant(_txnState.isInSet( + lk, TransactionState::kCommittingWithPrepare | TransactionState::kInProgress), str::stream() << "Current state: " << _txnState); invariant(_autoCommit); _transactionOperationBytes = 0; - return std::move(_transactionOperations); + _transactionOperations.clear(); } void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx) { @@ -1028,7 +1048,11 @@ void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx lk.unlock(); auto opObserver = opCtx->getServiceContext()->getOpObserver(); invariant(opObserver); - opObserver->onTransactionCommit(opCtx, boost::none, boost::none); + opObserver->onTransactionCommit( + opCtx, boost::none, boost::none, retrieveCompletedTransactionOperations(opCtx)); + + clearOperationsInMemory(opCtx); + lk.lock(); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); @@ -1108,9 +1132,14 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx, { // Once the transaction is committed, the oplog entry must be written. UninterruptibleLockGuard lockGuard(opCtx->lockState()); - opObserver->onTransactionCommit(opCtx, commitOplogSlot, commitTimestamp); + opObserver->onTransactionCommit(opCtx, + commitOplogSlot, + commitTimestamp, + retrieveCompletedTransactionOperations(opCtx)); } + clearOperationsInMemory(opCtx); + lk.lock(); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); |