diff options
Diffstat (limited to 'src/mongo/db/transaction_participant.cpp')
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 43 |
1 files changed, 38 insertions, 5 deletions
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 62296b07c24..a046061cd3f 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -463,7 +463,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx // Always check session's txnNumber and '_txnState', since they can be modified by session // kill and migration, which do not check out the session. _checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false); - // If this is not a multi-document transaction, there is nothing to unstash. if (_txnState.isNone(lg)) { invariant(!_txnResourceStash); @@ -475,7 +474,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx if (_txnResourceStash) { // Transaction resources already exist for this transaction. Transfer them from the // stash to the operation context. - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); uassert(ErrorCodes::InvalidOptions, "Only the first command in a transaction may specify a readConcern", @@ -595,6 +593,23 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot); abortGuard.Dismiss(); + + invariant(!_oldestOplogEntryTS, + str::stream() << "This transaction's oldest oplog entry Timestamp has already " + << "been set to: " + << _oldestOplogEntryTS->toString()); + // Keep track of the Timestamp from the first oplog entry written by this transaction. + _oldestOplogEntryTS = prepareOplogSlot.opTime.getTimestamp(); + + // Maintain the Timestamp of the oldest active oplog entry for this transaction. We currently + // only write an oplog entry for an in progress transaction when it is in the prepare state + // but this will change when we allow multiple oplog entries per transaction. + { + stdx::lock_guard<stdx::mutex> lm(_metricsMutex); + _transactionMetricsObserver.onPrepare(ServerTransactionsMetrics::get(opCtx), + *_oldestOplogEntryTS); + } + return prepareOplogSlot.opTime.getTimestamp(); } @@ -644,11 +659,16 @@ std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrie void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lk(_mutex); - uassert(ErrorCodes::InvalidOptions, "commitTransaction must provide commitTimestamp to prepared transaction.", !_txnState.isPrepared(lk)); + // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB. + invariant(!_oldestOplogEntryTS, + str::stream() << "The oldest oplog entry Timestamp should not have been set because " + << "this transaction is not prepared. But, it is currently " + << _oldestOplogEntryTS->toString()); + // Always check session's txnNumber and '_txnState', since they can be modified by session kill // and migration, which do not check out the session. _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); @@ -743,8 +763,10 @@ void TransactionParticipant::_commitTransaction(stdx::unique_lock<stdx::mutex> l const auto curTime = curTimeMicros64(); { stdx::lock_guard<stdx::mutex> lm(_metricsMutex); - _transactionMetricsObserver.onCommit( - ServerTransactionsMetrics::get(opCtx), curTime, &Top::get(getGlobalServiceContext())); + _transactionMetricsObserver.onCommit(ServerTransactionsMetrics::get(opCtx), + curTime, + _oldestOplogEntryTS, + &Top::get(getGlobalServiceContext())); _transactionMetricsObserver.onTransactionOperation( opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics); } @@ -823,6 +845,13 @@ void TransactionParticipant::abortActiveUnpreparedOrStashPreparedTransaction( _stashActiveTransaction(lock, opCtx); return; } + + // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB. + invariant(!_oldestOplogEntryTS, + str::stream() << "The oldest oplog entry Timestamp should not have been set because " + << "this transaction is not prepared. But, it is currently " + << _oldestOplogEntryTS->toString()); + _abortActiveTransaction(std::move(lock), opCtx, TransactionState::kInProgress); } catch (...) { // It is illegal for this to throw so we catch and log this here for diagnosability. @@ -897,6 +926,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _transactionMetricsObserver.onAbortInactive( ServerTransactionsMetrics::get(getGlobalServiceContext()), curTime, + _oldestOplogEntryTS, &Top::get(getGlobalServiceContext())); } _logSlowTransaction(wl, @@ -909,6 +939,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _transactionMetricsObserver.onAbortActive( ServerTransactionsMetrics::get(getGlobalServiceContext()), curTime, + _oldestOplogEntryTS, &Top::get(getGlobalServiceContext())); } @@ -916,6 +947,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _transactionOperations.clear(); _txnState.transitionTo(wl, TransactionState::kAborted); _prepareOpTime = repl::OpTime(); + _oldestOplogEntryTS = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); _getSession()->unlockTxnNumber(); @@ -1290,6 +1322,7 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN _transactionMetricsObserver.resetSingleTransactionStats(txnNumber); } _prepareOpTime = repl::OpTime(); + _oldestOplogEntryTS = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); _multikeyPathInfo.clear(); _autoCommit = boost::none; |