summaryrefslogtreecommitdiff
path: root/src/mongo/db/transaction_participant.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/db/transaction_participant.cpp')
-rw-r--r--src/mongo/db/transaction_participant.cpp43
1 files changed, 38 insertions, 5 deletions
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 62296b07c24..a046061cd3f 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -463,7 +463,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
// Always check session's txnNumber and '_txnState', since they can be modified by session
// kill and migration, which do not check out the session.
_checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false);
-
// If this is not a multi-document transaction, there is nothing to unstash.
if (_txnState.isNone(lg)) {
invariant(!_txnResourceStash);
@@ -475,7 +474,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
if (_txnResourceStash) {
// Transaction resources already exist for this transaction. Transfer them from the
// stash to the operation context.
-
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
"Only the first command in a transaction may specify a readConcern",
@@ -595,6 +593,23 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx,
opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot);
abortGuard.Dismiss();
+
+ invariant(!_oldestOplogEntryTS,
+ str::stream() << "This transaction's oldest oplog entry Timestamp has already "
+ << "been set to: "
+ << _oldestOplogEntryTS->toString());
+ // Keep track of the Timestamp from the first oplog entry written by this transaction.
+ _oldestOplogEntryTS = prepareOplogSlot.opTime.getTimestamp();
+
+ // Maintain the Timestamp of the oldest active oplog entry for this transaction. We currently
+ // only write an oplog entry for an in progress transaction when it is in the prepare state
+ // but this will change when we allow multiple oplog entries per transaction.
+ {
+ stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
+ _transactionMetricsObserver.onPrepare(ServerTransactionsMetrics::get(opCtx),
+ *_oldestOplogEntryTS);
+ }
+
return prepareOplogSlot.opTime.getTimestamp();
}
@@ -644,11 +659,16 @@ std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrie
void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
-
uassert(ErrorCodes::InvalidOptions,
"commitTransaction must provide commitTimestamp to prepared transaction.",
!_txnState.isPrepared(lk));
+ // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB.
+ invariant(!_oldestOplogEntryTS,
+ str::stream() << "The oldest oplog entry Timestamp should not have been set because "
+ << "this transaction is not prepared. But, it is currently "
+ << _oldestOplogEntryTS->toString());
+
// Always check session's txnNumber and '_txnState', since they can be modified by session kill
// and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
@@ -743,8 +763,10 @@ void TransactionParticipant::_commitTransaction(stdx::unique_lock<stdx::mutex> l
const auto curTime = curTimeMicros64();
{
stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
- _transactionMetricsObserver.onCommit(
- ServerTransactionsMetrics::get(opCtx), curTime, &Top::get(getGlobalServiceContext()));
+ _transactionMetricsObserver.onCommit(ServerTransactionsMetrics::get(opCtx),
+ curTime,
+ _oldestOplogEntryTS,
+ &Top::get(getGlobalServiceContext()));
_transactionMetricsObserver.onTransactionOperation(
opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics);
}
@@ -823,6 +845,13 @@ void TransactionParticipant::abortActiveUnpreparedOrStashPreparedTransaction(
_stashActiveTransaction(lock, opCtx);
return;
}
+
+ // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB.
+ invariant(!_oldestOplogEntryTS,
+ str::stream() << "The oldest oplog entry Timestamp should not have been set because "
+ << "this transaction is not prepared. But, it is currently "
+ << _oldestOplogEntryTS->toString());
+
_abortActiveTransaction(std::move(lock), opCtx, TransactionState::kInProgress);
} catch (...) {
// It is illegal for this to throw so we catch and log this here for diagnosability.
@@ -897,6 +926,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_transactionMetricsObserver.onAbortInactive(
ServerTransactionsMetrics::get(getGlobalServiceContext()),
curTime,
+ _oldestOplogEntryTS,
&Top::get(getGlobalServiceContext()));
}
_logSlowTransaction(wl,
@@ -909,6 +939,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_transactionMetricsObserver.onAbortActive(
ServerTransactionsMetrics::get(getGlobalServiceContext()),
curTime,
+ _oldestOplogEntryTS,
&Top::get(getGlobalServiceContext()));
}
@@ -916,6 +947,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_transactionOperations.clear();
_txnState.transitionTo(wl, TransactionState::kAborted);
_prepareOpTime = repl::OpTime();
+ _oldestOplogEntryTS = boost::none;
_speculativeTransactionReadOpTime = repl::OpTime();
_getSession()->unlockTxnNumber();
@@ -1290,6 +1322,7 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN
_transactionMetricsObserver.resetSingleTransactionStats(txnNumber);
}
_prepareOpTime = repl::OpTime();
+ _oldestOplogEntryTS = boost::none;
_speculativeTransactionReadOpTime = repl::OpTime();
_multikeyPathInfo.clear();
_autoCommit = boost::none;