summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_participant.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_participant.cpp')
-rw-r--r--src/mongo/db/transaction_participant.cpp43
1 files changed, 36 insertions, 7 deletions
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 7a5d0e812e8..8f710669788 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -928,7 +928,8 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx,
// We need to unlock the session to run the opObserver onTransactionPrepare, which calls back
// into the session.
lk.unlock();
- opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot);
+ opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(
+ opCtx, prepareOplogSlot, retrieveCompletedTransactionOperations(opCtx));
abortGuard.dismiss();
@@ -992,7 +993,7 @@ void TransactionParticipant::addTransactionOperation(OperationContext* opCtx,
_transactionOperationBytes <= BSONObjMaxInternalSize);
}
-std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrieveOperations(
+std::vector<repl::ReplOperation>& TransactionParticipant::retrieveCompletedTransactionOperations(
OperationContext* opCtx) {
stdx::lock_guard<stdx::mutex> lk(_mutex);
@@ -1000,13 +1001,32 @@ std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrie
// and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
- // Ensure that we only ever end a transaction when prepared or in progress.
- invariant(_txnState.isInSet(lk, TransactionState::kPrepared | TransactionState::kInProgress),
+ // Ensure that we only ever retrieve a transaction's completed operations when in progress,
+ // committing with prepare, or prepared.
+ invariant(_txnState.isInSet(lk,
+ TransactionState::kInProgress |
+ TransactionState::kCommittingWithPrepare |
+ TransactionState::kPrepared),
+ str::stream() << "Current state: " << _txnState);
+
+ return _transactionOperations;
+}
+
+void TransactionParticipant::clearOperationsInMemory(OperationContext* opCtx) {
+ stdx::lock_guard<stdx::mutex> lk(_mutex);
+
+ // Always check session's txnNumber and '_txnState', since they can be modified by session kill
+ // and migration, which do not check out the session.
+ _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
+
+ // Ensure that we only ever end a transaction when committing with prepare or in progress.
+ invariant(_txnState.isInSet(
+ lk, TransactionState::kCommittingWithPrepare | TransactionState::kInProgress),
str::stream() << "Current state: " << _txnState);
invariant(_autoCommit);
_transactionOperationBytes = 0;
- return std::move(_transactionOperations);
+ _transactionOperations.clear();
}
void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx) {
@@ -1028,7 +1048,11 @@ void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx
lk.unlock();
auto opObserver = opCtx->getServiceContext()->getOpObserver();
invariant(opObserver);
- opObserver->onTransactionCommit(opCtx, boost::none, boost::none);
+ opObserver->onTransactionCommit(
+ opCtx, boost::none, boost::none, retrieveCompletedTransactionOperations(opCtx));
+
+ clearOperationsInMemory(opCtx);
+
lk.lock();
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
@@ -1108,9 +1132,14 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx,
{
// Once the transaction is committed, the oplog entry must be written.
UninterruptibleLockGuard lockGuard(opCtx->lockState());
- opObserver->onTransactionCommit(opCtx, commitOplogSlot, commitTimestamp);
+ opObserver->onTransactionCommit(opCtx,
+ commitOplogSlot,
+ commitTimestamp,
+ retrieveCompletedTransactionOperations(opCtx));
}
+ clearOperationsInMemory(opCtx);
+
lk.lock();
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);