summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorPavi Vetriselvan <pvselvan@umich.edu>2018-09-20 11:06:37 -0400
committerPavi Vetriselvan <pvselvan@umich.edu>2018-09-20 11:08:04 -0400
commitdc4fd67df2cbfdd4977007463cce8bbb25ffdd4d (patch)
tree55c1c21f2ccad68966e96f3d12e869daf5f9245b
parentc69ec130f14953056461cf40ddf368d8ecf7185b (diff)
downloadmongo-dc4fd67df2cbfdd4977007463cce8bbb25ffdd4d.tar.gz
SERVER-35873 Maintain oldest prepareTimestamp
-rw-r--r--src/mongo/db/server_transactions_metrics.cpp30
-rw-r--r--src/mongo/db/server_transactions_metrics.h30
-rw-r--r--src/mongo/db/transaction_metrics_observer.cpp30
-rw-r--r--src/mongo/db/transaction_metrics_observer.h19
-rw-r--r--src/mongo/db/transaction_participant.cpp43
-rw-r--r--src/mongo/db/transaction_participant.h3
-rw-r--r--src/mongo/db/transaction_participant_test.cpp158
7 files changed, 304 insertions, 9 deletions
diff --git a/src/mongo/db/server_transactions_metrics.cpp b/src/mongo/db/server_transactions_metrics.cpp
index b573996b50b..9fb9f1dee0c 100644
--- a/src/mongo/db/server_transactions_metrics.cpp
+++ b/src/mongo/db/server_transactions_metrics.cpp
@@ -111,6 +111,36 @@ void ServerTransactionsMetrics::incrementTotalCommitted() {
_totalCommitted.fetchAndAdd(1);
}
+boost::optional<Timestamp> ServerTransactionsMetrics::getOldestActiveTS() const {
+ if (_oldestActiveOplogEntryTS.empty()) {
+ return boost::none;
+ }
+ return *(_oldestActiveOplogEntryTS.begin());
+}
+
+void ServerTransactionsMetrics::addActiveTS(Timestamp oldestOplogEntryTS) {
+ auto ret = _oldestActiveOplogEntryTS.insert(oldestOplogEntryTS);
+ // If ret.second is false, the timestamp we tried to insert already existed.
+ invariant(ret.second == true,
+ str::stream() << "This oplog entry timestamp already exists."
+ << "TS: "
+ << oldestOplogEntryTS.toString());
+}
+
+void ServerTransactionsMetrics::removeActiveTS(Timestamp oldestOplogEntryTS) {
+ auto it = _oldestActiveOplogEntryTS.find(oldestOplogEntryTS);
+ invariant(it != _oldestActiveOplogEntryTS.end(),
+ str::stream() << "This oplog entry timestamp does not exist "
+ << "or has already been removed."
+ << "TS: "
+ << oldestOplogEntryTS.toString());
+ _oldestActiveOplogEntryTS.erase(it);
+}
+
+unsigned int ServerTransactionsMetrics::getTotalActiveTS() const {
+ return _oldestActiveOplogEntryTS.size();
+}
+
void ServerTransactionsMetrics::updateStats(TransactionsStats* stats) {
stats->setCurrentActive(_currentActive.load());
stats->setCurrentInactive(_currentInactive.load());
diff --git a/src/mongo/db/server_transactions_metrics.h b/src/mongo/db/server_transactions_metrics.h
index f877a1f8b23..a026d921f6e 100644
--- a/src/mongo/db/server_transactions_metrics.h
+++ b/src/mongo/db/server_transactions_metrics.h
@@ -28,6 +28,9 @@
#pragma once
+#include <set>
+
+#include "mongo/bson/timestamp.h"
#include "mongo/db/operation_context.h"
#include "mongo/db/service_context.h"
#include "mongo/db/transactions_stats_gen.h"
@@ -68,6 +71,28 @@ public:
void incrementTotalCommitted();
/**
+ * Returns the Timestamp of the oldest oplog entry written across all open transactions.
+ * Returns boost::none if there are no transaction oplog entry Timestamps stored.
+ */
+ boost::optional<Timestamp> getOldestActiveTS() const;
+
+ /**
+ * Add the transaction's oplog entry Timestamp to a set of Timestamps.
+ */
+ void addActiveTS(Timestamp oldestOplogEntryTS);
+
+ /**
+ * Remove the corresponding transaction oplog entry Timestamp if the transaction commits or
+ * aborts.
+ */
+ void removeActiveTS(Timestamp oldestOplogEntryTS);
+
+ /**
+ * Returns the number of transaction oplog entry Timestamps currently stored.
+ */
+ unsigned int getTotalActiveTS() const;
+
+ /**
* Appends the accumulated stats to a transactions stats object.
*/
void updateStats(TransactionsStats* stats);
@@ -90,6 +115,11 @@ private:
// The total number of multi-document transaction commits.
AtomicUInt64 _totalCommitted{0};
+
+ // Maintain the oldest oplog entry Timestamp across all active transactions. Currently, we only
+ // write an oplog entry for an ongoing transaction if it is in the `prepare` state. By
+ // maintaining an ordered set of timestamps, the timestamp at the beginning will be the oldest.
+ std::set<Timestamp> _oldestActiveOplogEntryTS;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_metrics_observer.cpp b/src/mongo/db/transaction_metrics_observer.cpp
index ecc7c8bb225..0eec2a3d1a7 100644
--- a/src/mongo/db/transaction_metrics_observer.cpp
+++ b/src/mongo/db/transaction_metrics_observer.cpp
@@ -95,6 +95,7 @@ void TransactionMetricsObserver::onUnstash(ServerTransactionsMetrics* serverTran
void TransactionMetricsObserver::onCommit(ServerTransactionsMetrics* serverTransactionsMetrics,
unsigned long long curTime,
+ boost::optional<Timestamp> oldestOplogEntryTS,
Top* top) {
//
// Per transaction metrics.
@@ -116,10 +117,16 @@ void TransactionMetricsObserver::onCommit(ServerTransactionsMetrics* serverTrans
serverTransactionsMetrics->decrementCurrentActive();
top->incrementGlobalTransactionLatencyStats(_singleTransactionStats.getDuration(curTime));
+
+ // Remove this transaction's oldest oplog entry Timestamp if one was written.
+ if (oldestOplogEntryTS) {
+ serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS);
+ }
}
void TransactionMetricsObserver::onAbortActive(ServerTransactionsMetrics* serverTransactionsMetrics,
unsigned long long curTime,
+ boost::optional<Timestamp> oldestOplogEntryTS,
Top* top) {
_onAbort(serverTransactionsMetrics, curTime, top);
//
@@ -135,16 +142,29 @@ void TransactionMetricsObserver::onAbortActive(ServerTransactionsMetrics* server
// Server wide transactions metrics.
//
serverTransactionsMetrics->decrementCurrentActive();
+
+ // Remove this transaction's oldest oplog entry Timestamp if one was written.
+ if (oldestOplogEntryTS) {
+ serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS);
+ }
}
void TransactionMetricsObserver::onAbortInactive(
- ServerTransactionsMetrics* serverTransactionsMetrics, unsigned long long curTime, Top* top) {
+ ServerTransactionsMetrics* serverTransactionsMetrics,
+ unsigned long long curTime,
+ boost::optional<Timestamp> oldestOplogEntryTS,
+ Top* top) {
_onAbort(serverTransactionsMetrics, curTime, top);
//
// Server wide transactions metrics.
//
serverTransactionsMetrics->decrementCurrentInactive();
+
+ // Remove this transaction's oldest oplog entry Timestamp if one was written.
+ if (oldestOplogEntryTS) {
+ serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS);
+ }
}
void TransactionMetricsObserver::onTransactionOperation(Client* client,
@@ -176,4 +196,12 @@ void TransactionMetricsObserver::_onAbort(ServerTransactionsMetrics* serverTrans
top->incrementGlobalTransactionLatencyStats(_singleTransactionStats.getDuration(curTime));
}
+void TransactionMetricsObserver::onPrepare(ServerTransactionsMetrics* serverTransactionsMetrics,
+ Timestamp prepareTimestamp) {
+ // Since we currently only write an oplog entry for an in progress transaction when it is in
+ // the prepare state, the prepareTimestamp is currently the oldest timestamp written to the
+ // oplog for this transaction.
+ serverTransactionsMetrics->addActiveTS(prepareTimestamp);
+}
+
} // namespace mongo
diff --git a/src/mongo/db/transaction_metrics_observer.h b/src/mongo/db/transaction_metrics_observer.h
index e3de57539b6..1ec30bedfff 100644
--- a/src/mongo/db/transaction_metrics_observer.h
+++ b/src/mongo/db/transaction_metrics_observer.h
@@ -67,27 +67,40 @@ public:
unsigned long long curTime);
/**
- * Updates relevant metrics when a transaction commits.
+ * Updates relevant metrics when a transaction commits. Also removes this transaction's oldest
+ * oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none.
*/
void onCommit(ServerTransactionsMetrics* serverTransactionsMetrics,
unsigned long long curTime,
+ boost::optional<Timestamp> oldestOplogEntryTS,
Top* top);
/**
- * Updates relevant metrics when an active transaction aborts.
+ * Updates relevant metrics when an active transaction aborts. Also removes this transaction's
+ * oldest oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none.
*/
void onAbortActive(ServerTransactionsMetrics* serverTransactionsMetrics,
unsigned long long curTime,
+ boost::optional<Timestamp> oldestOplogEntryTS,
Top* top);
/**
- * Updates relevant metrics when an inactive transaction aborts.
+ * Updates relevant metrics when an inactive transaction aborts. Also removes this transaction's
+ * oldest oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none.
*/
void onAbortInactive(ServerTransactionsMetrics* serverTransactionsMetrics,
unsigned long long curTime,
+ boost::optional<Timestamp> oldestOplogEntryTS,
Top* top);
/**
+ * Adds the prepareTimestamp, which is currently the Timestamp of the first oplog entry written
+ * by an active transaction, to the oldestActiveOplogEntryTS set.
+ */
+ void onPrepare(ServerTransactionsMetrics* serverTransactionsMetrics,
+ Timestamp prepareTimestamp);
+
+ /**
* Updates relevant metrics when an operation running on the transaction completes. An operation
* may be a read/write operation, or an abort/commit command.
*/
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 62296b07c24..a046061cd3f 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -463,7 +463,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
// Always check session's txnNumber and '_txnState', since they can be modified by session
// kill and migration, which do not check out the session.
_checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false);
-
// If this is not a multi-document transaction, there is nothing to unstash.
if (_txnState.isNone(lg)) {
invariant(!_txnResourceStash);
@@ -475,7 +474,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx
if (_txnResourceStash) {
// Transaction resources already exist for this transaction. Transfer them from the
// stash to the operation context.
-
auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx);
uassert(ErrorCodes::InvalidOptions,
"Only the first command in a transaction may specify a readConcern",
@@ -595,6 +593,23 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx,
opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot);
abortGuard.Dismiss();
+
+ invariant(!_oldestOplogEntryTS,
+ str::stream() << "This transaction's oldest oplog entry Timestamp has already "
+ << "been set to: "
+ << _oldestOplogEntryTS->toString());
+ // Keep track of the Timestamp from the first oplog entry written by this transaction.
+ _oldestOplogEntryTS = prepareOplogSlot.opTime.getTimestamp();
+
+ // Maintain the Timestamp of the oldest active oplog entry for this transaction. We currently
+ // only write an oplog entry for an in progress transaction when it is in the prepare state
+ // but this will change when we allow multiple oplog entries per transaction.
+ {
+ stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
+ _transactionMetricsObserver.onPrepare(ServerTransactionsMetrics::get(opCtx),
+ *_oldestOplogEntryTS);
+ }
+
return prepareOplogSlot.opTime.getTimestamp();
}
@@ -644,11 +659,16 @@ std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrie
void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx) {
stdx::unique_lock<stdx::mutex> lk(_mutex);
-
uassert(ErrorCodes::InvalidOptions,
"commitTransaction must provide commitTimestamp to prepared transaction.",
!_txnState.isPrepared(lk));
+ // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB.
+ invariant(!_oldestOplogEntryTS,
+ str::stream() << "The oldest oplog entry Timestamp should not have been set because "
+ << "this transaction is not prepared. But, it is currently "
+ << _oldestOplogEntryTS->toString());
+
// Always check session's txnNumber and '_txnState', since they can be modified by session kill
// and migration, which do not check out the session.
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
@@ -743,8 +763,10 @@ void TransactionParticipant::_commitTransaction(stdx::unique_lock<stdx::mutex> l
const auto curTime = curTimeMicros64();
{
stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
- _transactionMetricsObserver.onCommit(
- ServerTransactionsMetrics::get(opCtx), curTime, &Top::get(getGlobalServiceContext()));
+ _transactionMetricsObserver.onCommit(ServerTransactionsMetrics::get(opCtx),
+ curTime,
+ _oldestOplogEntryTS,
+ &Top::get(getGlobalServiceContext()));
_transactionMetricsObserver.onTransactionOperation(
opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics);
}
@@ -823,6 +845,13 @@ void TransactionParticipant::abortActiveUnpreparedOrStashPreparedTransaction(
_stashActiveTransaction(lock, opCtx);
return;
}
+
+ // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB.
+ invariant(!_oldestOplogEntryTS,
+ str::stream() << "The oldest oplog entry Timestamp should not have been set because "
+ << "this transaction is not prepared. But, it is currently "
+ << _oldestOplogEntryTS->toString());
+
_abortActiveTransaction(std::move(lock), opCtx, TransactionState::kInProgress);
} catch (...) {
// It is illegal for this to throw so we catch and log this here for diagnosability.
@@ -897,6 +926,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_transactionMetricsObserver.onAbortInactive(
ServerTransactionsMetrics::get(getGlobalServiceContext()),
curTime,
+ _oldestOplogEntryTS,
&Top::get(getGlobalServiceContext()));
}
_logSlowTransaction(wl,
@@ -909,6 +939,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_transactionMetricsObserver.onAbortActive(
ServerTransactionsMetrics::get(getGlobalServiceContext()),
curTime,
+ _oldestOplogEntryTS,
&Top::get(getGlobalServiceContext()));
}
@@ -916,6 +947,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_transactionOperations.clear();
_txnState.transitionTo(wl, TransactionState::kAborted);
_prepareOpTime = repl::OpTime();
+ _oldestOplogEntryTS = boost::none;
_speculativeTransactionReadOpTime = repl::OpTime();
_getSession()->unlockTxnNumber();
@@ -1290,6 +1322,7 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN
_transactionMetricsObserver.resetSingleTransactionStats(txnNumber);
}
_prepareOpTime = repl::OpTime();
+ _oldestOplogEntryTS = boost::none;
_speculativeTransactionReadOpTime = repl::OpTime();
_multikeyPathInfo.clear();
_autoCommit = boost::none;
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 57b1884db25..5068a658b20 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -626,6 +626,9 @@ private:
// Tracks and updates transaction metrics upon the appropriate transaction event.
TransactionMetricsObserver _transactionMetricsObserver;
+
+ // Tracks the Timestamp of the first oplog entry written by this TransactionParticipant.
+ boost::optional<Timestamp> _oldestOplogEntryTS;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index e3a64a45cdc..26d1f5e32e8 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -985,6 +985,8 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPrepareDoesNotAbortTransaction) {
// Check that prepareTimestamp gets set.
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
+ // Check that the oldest prepareTimestamp is the one we just set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
ASSERT(_opObserver->transactionPrepared);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
}
@@ -1171,6 +1173,8 @@ TEST_F(TxnParticipantTest, KillSessionsDoesNotAbortPreparedTransactions) {
// Check that prepareTimestamp gets set.
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
+ // Check that the oldest prepareTimestamp is the one we just set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
txnParticipant->abortArbitraryTransaction();
@@ -1195,6 +1199,8 @@ TEST_F(TxnParticipantTest, TransactionTimeoutDoesNotAbortPreparedTransactions) {
// Check that prepareTimestamp gets set.
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
+ // Check that the oldest prepareTimestamp is the one we just set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
txnParticipant->abortArbitraryTransactionIfExpired();
@@ -1221,6 +1227,9 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
+ // Check that the oldest prepareTimestamp is the one we just set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
+
txnParticipant->stashTransactionResources(opCtx());
{
@@ -1307,6 +1316,9 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti
txnParticipant->addTransactionOperation(opCtx(), operation);
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
+ // Check that the oldest prepareTimestamp is the one we just set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
+
auto sessionId = *opCtx()->getLogicalSessionId();
auto txnNum = *opCtx()->getTxnNumber();
_opObserver->onTransactionCommitFn = [&](bool wasPrepared) {
@@ -1322,6 +1334,8 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti
};
txnParticipant->commitPreparedTransaction(opCtx(), prepareTimestamp);
+ // Check that we removed the prepareTimestamp from the set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none);
}
TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) {
@@ -2838,5 +2852,149 @@ TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowStashedAbort) {
ASSERT_EQUALS(1, countLogLinesContaining(expectedTransactionInfo));
}
+TEST_F(TxnParticipantTest, WhenOldestTSRemovedNextOldestBecomesNewOldest) {
+ auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+
+ // Check that there are no Timestamps in the set.
+ unsigned int zero = 0;
+ ASSERT_EQ(totalActiveTxnTS, zero);
+
+ txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
+ auto firstPrepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
+ // Check that we added a Timestamp to the set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1);
+ // totalActiveTxnTS = 1
+ totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ // Check that the oldest prepareTimestamp is equal to firstPrepareTimestamp because there is
+ // only one prepared transaction on this Service.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), firstPrepareTimestamp);
+ ASSERT_FALSE(txnParticipant->transactionIsAborted());
+
+ txnParticipant->stashTransactionResources(opCtx());
+ auto originalClient = Client::releaseCurrent();
+
+ /**
+ * Make a new Session, Client, OperationContext and transaction.
+ */
+ auto service = opCtx()->getServiceContext();
+ auto newClientOwned = service->makeClient("newClient");
+ auto newClient = newClientOwned.get();
+ Client::setCurrent(std::move(newClientOwned));
+
+ const TxnNumber newTxnNum = 10;
+ const auto newSessionId = makeLogicalSessionIdForTest();
+ auto secondPrepareTimestamp = Timestamp();
+
+ {
+ auto newOpCtx = newClient->makeOperationContext();
+ newOpCtx.get()->setLogicalSessionId(newSessionId);
+ newOpCtx.get()->setTxnNumber(newTxnNum);
+
+ OperationContextSessionMongod newOpCtxSession(newOpCtx.get(), true, false, true);
+ auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get());
+ newTxnParticipant->unstashTransactionResources(newOpCtx.get(), "prepareTransaction");
+
+ // secondPrepareTimestamp should be greater than firstPreparedTimestamp because this
+ // transaction was prepared after.
+ secondPrepareTimestamp = newTxnParticipant->prepareTransaction(newOpCtx.get(), {});
+ ASSERT_GT(secondPrepareTimestamp, firstPrepareTimestamp);
+ // Check that we added a Timestamp to the set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(),
+ totalActiveTxnTS + 1);
+ // totalActiveTxnTS = 2
+ totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ // The oldest prepareTimestamp should still be firstPrepareTimestamp.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(),
+ firstPrepareTimestamp);
+ ASSERT_FALSE(txnParticipant->transactionIsAborted());
+ }
+
+ Client::releaseCurrent();
+ Client::setCurrent(std::move(originalClient));
+
+ // Switch clients and abort the first transaction. This should cause the oldestActiveTS to be
+ // equal to the secondPrepareTimestamp.
+ txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
+ txnParticipant->abortActiveTransaction(opCtx());
+ ASSERT(txnParticipant->transactionIsAborted());
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), secondPrepareTimestamp);
+}
+
+TEST_F(TxnParticipantTest, ReturnNullTimestampIfNoOldestActiveTimestamp) {
+ auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ OperationContextSessionMongod opCtxSession(opCtx(), true, false, true);
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+
+ // Check that there are no Timestamps in the set.
+ unsigned int zero = 0;
+ ASSERT_EQ(totalActiveTxnTS, zero);
+
+ txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
+ txnParticipant->prepareTransaction(opCtx(), {});
+ // Check that we added a Timestamp to the set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1);
+ // totalActiveTxnTS = 1
+ totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ ASSERT_FALSE(txnParticipant->transactionIsAborted());
+
+ txnParticipant->stashTransactionResources(opCtx());
+ auto originalClient = Client::releaseCurrent();
+
+ /**
+ * Make a new Session, Client, OperationContext and transaction.
+ */
+ auto service = opCtx()->getServiceContext();
+ auto newClientOwned = service->makeClient("newClient");
+ auto newClient = newClientOwned.get();
+ Client::setCurrent(std::move(newClientOwned));
+
+ const TxnNumber newTxnNum = 10;
+ const auto newSessionId = makeLogicalSessionIdForTest();
+
+ {
+ auto newOpCtx = newClient->makeOperationContext();
+ newOpCtx.get()->setLogicalSessionId(newSessionId);
+ newOpCtx.get()->setTxnNumber(newTxnNum);
+
+ OperationContextSessionMongod newOpCtxSession(newOpCtx.get(), true, false, true);
+ auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get());
+ newTxnParticipant->unstashTransactionResources(newOpCtx.get(), "prepareTransaction");
+
+ // secondPrepareTimestamp should be greater than firstPreparedTimestamp because this
+ // transaction was prepared after.
+ newTxnParticipant->prepareTransaction(newOpCtx.get(), {});
+ // Check that we added a Timestamp to the set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(),
+ totalActiveTxnTS + 1);
+ // totalActiveTxnTS = 2
+ totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ // The oldest prepareTimestamp should still be firstPrepareTimestamp.
+ ASSERT_FALSE(txnParticipant->transactionIsAborted());
+
+ // Abort this transaction and check that we have decremented the total active timestamps
+ // count.
+ newTxnParticipant->abortActiveTransaction(newOpCtx.get());
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(),
+ totalActiveTxnTS - 1);
+ }
+
+ Client::releaseCurrent();
+ Client::setCurrent(std::move(originalClient));
+ // totalActiveTxnTS = 1
+ totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+
+ // Switch clients and abort the first transaction. This means we no longer have an oldest active
+ // timestamp.
+ txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
+ txnParticipant->abortActiveTransaction(opCtx());
+ ASSERT(txnParticipant->transactionIsAborted());
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none);
+}
+
+
} // namespace
} // namespace mongo