diff options
author | Pavi Vetriselvan <pvselvan@umich.edu> | 2018-09-20 11:06:37 -0400 |
---|---|---|
committer | Pavi Vetriselvan <pvselvan@umich.edu> | 2018-09-20 11:08:04 -0400 |
commit | dc4fd67df2cbfdd4977007463cce8bbb25ffdd4d (patch) | |
tree | 55c1c21f2ccad68966e96f3d12e869daf5f9245b | |
parent | c69ec130f14953056461cf40ddf368d8ecf7185b (diff) | |
download | mongo-dc4fd67df2cbfdd4977007463cce8bbb25ffdd4d.tar.gz |
SERVER-35873 Maintain oldest prepareTimestamp
-rw-r--r-- | src/mongo/db/server_transactions_metrics.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/server_transactions_metrics.h | 30 | ||||
-rw-r--r-- | src/mongo/db/transaction_metrics_observer.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/transaction_metrics_observer.h | 19 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 3 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 158 |
7 files changed, 304 insertions, 9 deletions
diff --git a/src/mongo/db/server_transactions_metrics.cpp b/src/mongo/db/server_transactions_metrics.cpp index b573996b50b..9fb9f1dee0c 100644 --- a/src/mongo/db/server_transactions_metrics.cpp +++ b/src/mongo/db/server_transactions_metrics.cpp @@ -111,6 +111,36 @@ void ServerTransactionsMetrics::incrementTotalCommitted() { _totalCommitted.fetchAndAdd(1); } +boost::optional<Timestamp> ServerTransactionsMetrics::getOldestActiveTS() const { + if (_oldestActiveOplogEntryTS.empty()) { + return boost::none; + } + return *(_oldestActiveOplogEntryTS.begin()); +} + +void ServerTransactionsMetrics::addActiveTS(Timestamp oldestOplogEntryTS) { + auto ret = _oldestActiveOplogEntryTS.insert(oldestOplogEntryTS); + // If ret.second is false, the timestamp we tried to insert already existed. + invariant(ret.second == true, + str::stream() << "This oplog entry timestamp already exists." + << "TS: " + << oldestOplogEntryTS.toString()); +} + +void ServerTransactionsMetrics::removeActiveTS(Timestamp oldestOplogEntryTS) { + auto it = _oldestActiveOplogEntryTS.find(oldestOplogEntryTS); + invariant(it != _oldestActiveOplogEntryTS.end(), + str::stream() << "This oplog entry timestamp does not exist " + << "or has already been removed." + << "TS: " + << oldestOplogEntryTS.toString()); + _oldestActiveOplogEntryTS.erase(it); +} + +unsigned int ServerTransactionsMetrics::getTotalActiveTS() const { + return _oldestActiveOplogEntryTS.size(); +} + void ServerTransactionsMetrics::updateStats(TransactionsStats* stats) { stats->setCurrentActive(_currentActive.load()); stats->setCurrentInactive(_currentInactive.load()); diff --git a/src/mongo/db/server_transactions_metrics.h b/src/mongo/db/server_transactions_metrics.h index f877a1f8b23..a026d921f6e 100644 --- a/src/mongo/db/server_transactions_metrics.h +++ b/src/mongo/db/server_transactions_metrics.h @@ -28,6 +28,9 @@ #pragma once +#include <set> + +#include "mongo/bson/timestamp.h" #include "mongo/db/operation_context.h" #include "mongo/db/service_context.h" #include "mongo/db/transactions_stats_gen.h" @@ -68,6 +71,28 @@ public: void incrementTotalCommitted(); /** + * Returns the Timestamp of the oldest oplog entry written across all open transactions. + * Returns boost::none if there are no transaction oplog entry Timestamps stored. + */ + boost::optional<Timestamp> getOldestActiveTS() const; + + /** + * Add the transaction's oplog entry Timestamp to a set of Timestamps. + */ + void addActiveTS(Timestamp oldestOplogEntryTS); + + /** + * Remove the corresponding transaction oplog entry Timestamp if the transaction commits or + * aborts. + */ + void removeActiveTS(Timestamp oldestOplogEntryTS); + + /** + * Returns the number of transaction oplog entry Timestamps currently stored. + */ + unsigned int getTotalActiveTS() const; + + /** * Appends the accumulated stats to a transactions stats object. */ void updateStats(TransactionsStats* stats); @@ -90,6 +115,11 @@ private: // The total number of multi-document transaction commits. AtomicUInt64 _totalCommitted{0}; + + // Maintain the oldest oplog entry Timestamp across all active transactions. Currently, we only + // write an oplog entry for an ongoing transaction if it is in the `prepare` state. By + // maintaining an ordered set of timestamps, the timestamp at the beginning will be the oldest. + std::set<Timestamp> _oldestActiveOplogEntryTS; }; } // namespace mongo diff --git a/src/mongo/db/transaction_metrics_observer.cpp b/src/mongo/db/transaction_metrics_observer.cpp index ecc7c8bb225..0eec2a3d1a7 100644 --- a/src/mongo/db/transaction_metrics_observer.cpp +++ b/src/mongo/db/transaction_metrics_observer.cpp @@ -95,6 +95,7 @@ void TransactionMetricsObserver::onUnstash(ServerTransactionsMetrics* serverTran void TransactionMetricsObserver::onCommit(ServerTransactionsMetrics* serverTransactionsMetrics, unsigned long long curTime, + boost::optional<Timestamp> oldestOplogEntryTS, Top* top) { // // Per transaction metrics. @@ -116,10 +117,16 @@ void TransactionMetricsObserver::onCommit(ServerTransactionsMetrics* serverTrans serverTransactionsMetrics->decrementCurrentActive(); top->incrementGlobalTransactionLatencyStats(_singleTransactionStats.getDuration(curTime)); + + // Remove this transaction's oldest oplog entry Timestamp if one was written. + if (oldestOplogEntryTS) { + serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS); + } } void TransactionMetricsObserver::onAbortActive(ServerTransactionsMetrics* serverTransactionsMetrics, unsigned long long curTime, + boost::optional<Timestamp> oldestOplogEntryTS, Top* top) { _onAbort(serverTransactionsMetrics, curTime, top); // @@ -135,16 +142,29 @@ void TransactionMetricsObserver::onAbortActive(ServerTransactionsMetrics* server // Server wide transactions metrics. // serverTransactionsMetrics->decrementCurrentActive(); + + // Remove this transaction's oldest oplog entry Timestamp if one was written. + if (oldestOplogEntryTS) { + serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS); + } } void TransactionMetricsObserver::onAbortInactive( - ServerTransactionsMetrics* serverTransactionsMetrics, unsigned long long curTime, Top* top) { + ServerTransactionsMetrics* serverTransactionsMetrics, + unsigned long long curTime, + boost::optional<Timestamp> oldestOplogEntryTS, + Top* top) { _onAbort(serverTransactionsMetrics, curTime, top); // // Server wide transactions metrics. // serverTransactionsMetrics->decrementCurrentInactive(); + + // Remove this transaction's oldest oplog entry Timestamp if one was written. + if (oldestOplogEntryTS) { + serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS); + } } void TransactionMetricsObserver::onTransactionOperation(Client* client, @@ -176,4 +196,12 @@ void TransactionMetricsObserver::_onAbort(ServerTransactionsMetrics* serverTrans top->incrementGlobalTransactionLatencyStats(_singleTransactionStats.getDuration(curTime)); } +void TransactionMetricsObserver::onPrepare(ServerTransactionsMetrics* serverTransactionsMetrics, + Timestamp prepareTimestamp) { + // Since we currently only write an oplog entry for an in progress transaction when it is in + // the prepare state, the prepareTimestamp is currently the oldest timestamp written to the + // oplog for this transaction. + serverTransactionsMetrics->addActiveTS(prepareTimestamp); +} + } // namespace mongo diff --git a/src/mongo/db/transaction_metrics_observer.h b/src/mongo/db/transaction_metrics_observer.h index e3de57539b6..1ec30bedfff 100644 --- a/src/mongo/db/transaction_metrics_observer.h +++ b/src/mongo/db/transaction_metrics_observer.h @@ -67,27 +67,40 @@ public: unsigned long long curTime); /** - * Updates relevant metrics when a transaction commits. + * Updates relevant metrics when a transaction commits. Also removes this transaction's oldest + * oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none. */ void onCommit(ServerTransactionsMetrics* serverTransactionsMetrics, unsigned long long curTime, + boost::optional<Timestamp> oldestOplogEntryTS, Top* top); /** - * Updates relevant metrics when an active transaction aborts. + * Updates relevant metrics when an active transaction aborts. Also removes this transaction's + * oldest oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none. */ void onAbortActive(ServerTransactionsMetrics* serverTransactionsMetrics, unsigned long long curTime, + boost::optional<Timestamp> oldestOplogEntryTS, Top* top); /** - * Updates relevant metrics when an inactive transaction aborts. + * Updates relevant metrics when an inactive transaction aborts. Also removes this transaction's + * oldest oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none. */ void onAbortInactive(ServerTransactionsMetrics* serverTransactionsMetrics, unsigned long long curTime, + boost::optional<Timestamp> oldestOplogEntryTS, Top* top); /** + * Adds the prepareTimestamp, which is currently the Timestamp of the first oplog entry written + * by an active transaction, to the oldestActiveOplogEntryTS set. + */ + void onPrepare(ServerTransactionsMetrics* serverTransactionsMetrics, + Timestamp prepareTimestamp); + + /** * Updates relevant metrics when an operation running on the transaction completes. An operation * may be a read/write operation, or an abort/commit command. */ diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 62296b07c24..a046061cd3f 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -463,7 +463,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx // Always check session's txnNumber and '_txnState', since they can be modified by session // kill and migration, which do not check out the session. _checkIsActiveTransaction(lg, *opCtx->getTxnNumber(), false); - // If this is not a multi-document transaction, there is nothing to unstash. if (_txnState.isNone(lg)) { invariant(!_txnResourceStash); @@ -475,7 +474,6 @@ void TransactionParticipant::unstashTransactionResources(OperationContext* opCtx if (_txnResourceStash) { // Transaction resources already exist for this transaction. Transfer them from the // stash to the operation context. - auto& readConcernArgs = repl::ReadConcernArgs::get(opCtx); uassert(ErrorCodes::InvalidOptions, "Only the first command in a transaction may specify a readConcern", @@ -595,6 +593,23 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, opCtx->getServiceContext()->getOpObserver()->onTransactionPrepare(opCtx, prepareOplogSlot); abortGuard.Dismiss(); + + invariant(!_oldestOplogEntryTS, + str::stream() << "This transaction's oldest oplog entry Timestamp has already " + << "been set to: " + << _oldestOplogEntryTS->toString()); + // Keep track of the Timestamp from the first oplog entry written by this transaction. + _oldestOplogEntryTS = prepareOplogSlot.opTime.getTimestamp(); + + // Maintain the Timestamp of the oldest active oplog entry for this transaction. We currently + // only write an oplog entry for an in progress transaction when it is in the prepare state + // but this will change when we allow multiple oplog entries per transaction. + { + stdx::lock_guard<stdx::mutex> lm(_metricsMutex); + _transactionMetricsObserver.onPrepare(ServerTransactionsMetrics::get(opCtx), + *_oldestOplogEntryTS); + } + return prepareOplogSlot.opTime.getTimestamp(); } @@ -644,11 +659,16 @@ std::vector<repl::ReplOperation> TransactionParticipant::endTransactionAndRetrie void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx) { stdx::unique_lock<stdx::mutex> lk(_mutex); - uassert(ErrorCodes::InvalidOptions, "commitTransaction must provide commitTimestamp to prepared transaction.", !_txnState.isPrepared(lk)); + // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB. + invariant(!_oldestOplogEntryTS, + str::stream() << "The oldest oplog entry Timestamp should not have been set because " + << "this transaction is not prepared. But, it is currently " + << _oldestOplogEntryTS->toString()); + // Always check session's txnNumber and '_txnState', since they can be modified by session kill // and migration, which do not check out the session. _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); @@ -743,8 +763,10 @@ void TransactionParticipant::_commitTransaction(stdx::unique_lock<stdx::mutex> l const auto curTime = curTimeMicros64(); { stdx::lock_guard<stdx::mutex> lm(_metricsMutex); - _transactionMetricsObserver.onCommit( - ServerTransactionsMetrics::get(opCtx), curTime, &Top::get(getGlobalServiceContext())); + _transactionMetricsObserver.onCommit(ServerTransactionsMetrics::get(opCtx), + curTime, + _oldestOplogEntryTS, + &Top::get(getGlobalServiceContext())); _transactionMetricsObserver.onTransactionOperation( opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics); } @@ -823,6 +845,13 @@ void TransactionParticipant::abortActiveUnpreparedOrStashPreparedTransaction( _stashActiveTransaction(lock, opCtx); return; } + + // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB. + invariant(!_oldestOplogEntryTS, + str::stream() << "The oldest oplog entry Timestamp should not have been set because " + << "this transaction is not prepared. But, it is currently " + << _oldestOplogEntryTS->toString()); + _abortActiveTransaction(std::move(lock), opCtx, TransactionState::kInProgress); } catch (...) { // It is illegal for this to throw so we catch and log this here for diagnosability. @@ -897,6 +926,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _transactionMetricsObserver.onAbortInactive( ServerTransactionsMetrics::get(getGlobalServiceContext()), curTime, + _oldestOplogEntryTS, &Top::get(getGlobalServiceContext())); } _logSlowTransaction(wl, @@ -909,6 +939,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _transactionMetricsObserver.onAbortActive( ServerTransactionsMetrics::get(getGlobalServiceContext()), curTime, + _oldestOplogEntryTS, &Top::get(getGlobalServiceContext())); } @@ -916,6 +947,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _transactionOperations.clear(); _txnState.transitionTo(wl, TransactionState::kAborted); _prepareOpTime = repl::OpTime(); + _oldestOplogEntryTS = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); _getSession()->unlockTxnNumber(); @@ -1290,6 +1322,7 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN _transactionMetricsObserver.resetSingleTransactionStats(txnNumber); } _prepareOpTime = repl::OpTime(); + _oldestOplogEntryTS = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); _multikeyPathInfo.clear(); _autoCommit = boost::none; diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 57b1884db25..5068a658b20 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -626,6 +626,9 @@ private: // Tracks and updates transaction metrics upon the appropriate transaction event. TransactionMetricsObserver _transactionMetricsObserver; + + // Tracks the Timestamp of the first oplog entry written by this TransactionParticipant. + boost::optional<Timestamp> _oldestOplogEntryTS; }; } // namespace mongo diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index e3a64a45cdc..26d1f5e32e8 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -985,6 +985,8 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPrepareDoesNotAbortTransaction) { // Check that prepareTimestamp gets set. auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); + // Check that the oldest prepareTimestamp is the one we just set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); ASSERT(_opObserver->transactionPrepared); ASSERT_FALSE(txnParticipant->transactionIsAborted()); } @@ -1171,6 +1173,8 @@ TEST_F(TxnParticipantTest, KillSessionsDoesNotAbortPreparedTransactions) { // Check that prepareTimestamp gets set. auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); + // Check that the oldest prepareTimestamp is the one we just set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); txnParticipant->abortArbitraryTransaction(); @@ -1195,6 +1199,8 @@ TEST_F(TxnParticipantTest, TransactionTimeoutDoesNotAbortPreparedTransactions) { // Check that prepareTimestamp gets set. auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); + // Check that the oldest prepareTimestamp is the one we just set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); txnParticipant->abortArbitraryTransactionIfExpired(); @@ -1221,6 +1227,9 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); + // Check that the oldest prepareTimestamp is the one we just set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); + txnParticipant->stashTransactionResources(opCtx()); { @@ -1307,6 +1316,9 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti txnParticipant->addTransactionOperation(opCtx(), operation); auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); + // Check that the oldest prepareTimestamp is the one we just set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); + auto sessionId = *opCtx()->getLogicalSessionId(); auto txnNum = *opCtx()->getTxnNumber(); _opObserver->onTransactionCommitFn = [&](bool wasPrepared) { @@ -1322,6 +1334,8 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti }; txnParticipant->commitPreparedTransaction(opCtx(), prepareTimestamp); + // Check that we removed the prepareTimestamp from the set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none); } TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) { @@ -2838,5 +2852,149 @@ TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowStashedAbort) { ASSERT_EQUALS(1, countLogLinesContaining(expectedTransactionInfo)); } +TEST_F(TxnParticipantTest, WhenOldestTSRemovedNextOldestBecomesNewOldest) { + auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + OperationContextSessionMongod opCtxSession(opCtx(), true, false, true); + auto txnParticipant = TransactionParticipant::get(opCtx()); + + // Check that there are no Timestamps in the set. + unsigned int zero = 0; + ASSERT_EQ(totalActiveTxnTS, zero); + + txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); + auto firstPrepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); + // Check that we added a Timestamp to the set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1); + // totalActiveTxnTS = 1 + totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + // Check that the oldest prepareTimestamp is equal to firstPrepareTimestamp because there is + // only one prepared transaction on this Service. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), firstPrepareTimestamp); + ASSERT_FALSE(txnParticipant->transactionIsAborted()); + + txnParticipant->stashTransactionResources(opCtx()); + auto originalClient = Client::releaseCurrent(); + + /** + * Make a new Session, Client, OperationContext and transaction. + */ + auto service = opCtx()->getServiceContext(); + auto newClientOwned = service->makeClient("newClient"); + auto newClient = newClientOwned.get(); + Client::setCurrent(std::move(newClientOwned)); + + const TxnNumber newTxnNum = 10; + const auto newSessionId = makeLogicalSessionIdForTest(); + auto secondPrepareTimestamp = Timestamp(); + + { + auto newOpCtx = newClient->makeOperationContext(); + newOpCtx.get()->setLogicalSessionId(newSessionId); + newOpCtx.get()->setTxnNumber(newTxnNum); + + OperationContextSessionMongod newOpCtxSession(newOpCtx.get(), true, false, true); + auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get()); + newTxnParticipant->unstashTransactionResources(newOpCtx.get(), "prepareTransaction"); + + // secondPrepareTimestamp should be greater than firstPreparedTimestamp because this + // transaction was prepared after. + secondPrepareTimestamp = newTxnParticipant->prepareTransaction(newOpCtx.get(), {}); + ASSERT_GT(secondPrepareTimestamp, firstPrepareTimestamp); + // Check that we added a Timestamp to the set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), + totalActiveTxnTS + 1); + // totalActiveTxnTS = 2 + totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + // The oldest prepareTimestamp should still be firstPrepareTimestamp. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), + firstPrepareTimestamp); + ASSERT_FALSE(txnParticipant->transactionIsAborted()); + } + + Client::releaseCurrent(); + Client::setCurrent(std::move(originalClient)); + + // Switch clients and abort the first transaction. This should cause the oldestActiveTS to be + // equal to the secondPrepareTimestamp. + txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); + txnParticipant->abortActiveTransaction(opCtx()); + ASSERT(txnParticipant->transactionIsAborted()); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), secondPrepareTimestamp); +} + +TEST_F(TxnParticipantTest, ReturnNullTimestampIfNoOldestActiveTimestamp) { + auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + OperationContextSessionMongod opCtxSession(opCtx(), true, false, true); + auto txnParticipant = TransactionParticipant::get(opCtx()); + + // Check that there are no Timestamps in the set. + unsigned int zero = 0; + ASSERT_EQ(totalActiveTxnTS, zero); + + txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); + txnParticipant->prepareTransaction(opCtx(), {}); + // Check that we added a Timestamp to the set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1); + // totalActiveTxnTS = 1 + totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + ASSERT_FALSE(txnParticipant->transactionIsAborted()); + + txnParticipant->stashTransactionResources(opCtx()); + auto originalClient = Client::releaseCurrent(); + + /** + * Make a new Session, Client, OperationContext and transaction. + */ + auto service = opCtx()->getServiceContext(); + auto newClientOwned = service->makeClient("newClient"); + auto newClient = newClientOwned.get(); + Client::setCurrent(std::move(newClientOwned)); + + const TxnNumber newTxnNum = 10; + const auto newSessionId = makeLogicalSessionIdForTest(); + + { + auto newOpCtx = newClient->makeOperationContext(); + newOpCtx.get()->setLogicalSessionId(newSessionId); + newOpCtx.get()->setTxnNumber(newTxnNum); + + OperationContextSessionMongod newOpCtxSession(newOpCtx.get(), true, false, true); + auto newTxnParticipant = TransactionParticipant::get(newOpCtx.get()); + newTxnParticipant->unstashTransactionResources(newOpCtx.get(), "prepareTransaction"); + + // secondPrepareTimestamp should be greater than firstPreparedTimestamp because this + // transaction was prepared after. + newTxnParticipant->prepareTransaction(newOpCtx.get(), {}); + // Check that we added a Timestamp to the set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), + totalActiveTxnTS + 1); + // totalActiveTxnTS = 2 + totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + // The oldest prepareTimestamp should still be firstPrepareTimestamp. + ASSERT_FALSE(txnParticipant->transactionIsAborted()); + + // Abort this transaction and check that we have decremented the total active timestamps + // count. + newTxnParticipant->abortActiveTransaction(newOpCtx.get()); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), + totalActiveTxnTS - 1); + } + + Client::releaseCurrent(); + Client::setCurrent(std::move(originalClient)); + // totalActiveTxnTS = 1 + totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + + // Switch clients and abort the first transaction. This means we no longer have an oldest active + // timestamp. + txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); + txnParticipant->abortActiveTransaction(opCtx()); + ASSERT(txnParticipant->transactionIsAborted()); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none); +} + + } // namespace } // namespace mongo |