diff options
Diffstat (limited to 'src/mongo/db')
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl.cpp | 30 | ||||
-rw-r--r-- | src/mongo/db/repl/replication_coordinator_impl_test.cpp | 19 | ||||
-rw-r--r-- | src/mongo/db/server_transactions_metrics.cpp | 120 | ||||
-rw-r--r-- | src/mongo/db/server_transactions_metrics.h | 69 | ||||
-rw-r--r-- | src/mongo/db/transaction_metrics_observer.cpp | 37 | ||||
-rw-r--r-- | src/mongo/db/transaction_metrics_observer.h | 27 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.cpp | 43 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant.h | 7 | ||||
-rw-r--r-- | src/mongo/db/transaction_participant_test.cpp | 185 |
9 files changed, 424 insertions, 113 deletions
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp index e036ddbb975..77ebaefdfa3 100644 --- a/src/mongo/db/repl/replication_coordinator_impl.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl.cpp @@ -72,6 +72,7 @@ #include "mongo/db/repl/vote_requester.h" #include "mongo/db/server_options.h" #include "mongo/db/server_parameters.h" +#include "mongo/db/server_transactions_metrics.h" #include "mongo/db/transaction_participant.h" #include "mongo/db/write_concern.h" #include "mongo/db/write_concern_options.h" @@ -3068,7 +3069,23 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_calculateStableOpTime_inloc maximumStableTimestamp = std::min(maximumStableTimestamp, holdStableTimestamp); } - const auto maximumStableOpTime = OpTime(maximumStableTimestamp, commitPoint.getTerm()); + auto maximumStableOpTime = OpTime(maximumStableTimestamp, commitPoint.getTerm()); + + // When calculating the stable optime, compare it to the oldest oplog entry timestamp across + // transactions whose corresponding commit/abort oplog entries have not been majority committed. + const auto serverTxnMetrics = ServerTransactionsMetrics::get(getGlobalServiceContext()); + const auto oldestNonMajCommittedOpTime = + serverTxnMetrics->getOldestNonMajorityCommittedOpTime(); + + if (oldestNonMajCommittedOpTime) { + if (oldestNonMajCommittedOpTime->getTimestamp() < maximumStableTimestamp) { + // If there is an oldest non-majority committed timestamp that is less than the current + // max stable timestamp, then update the max stable timestamp/optime accordingly. + maximumStableTimestamp = oldestNonMajCommittedOpTime->getTimestamp(); + maximumStableOpTime = + OpTime(maximumStableTimestamp, oldestNonMajCommittedOpTime->getTerm()); + } + } // Find the greatest optime candidate that is less than or equal to the commit point. // To do this we first find the upper bound of 'commitPoint', which points to the smallest @@ -3126,6 +3143,16 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_getStableOpTime_inlock() { invariant(snapshotOpTime <= commitPoint); } + // If we advanced the commit point and have prepared transactions, check if their commit or + // abort timestamps are <= the commit point. If so, remove them from our oldest non-majority + // committed optimes set because we know that the commit/abort oplog entries are majority + // committed. + // We must remove these optimes before calling _calculateStableOpTime_inlock because we want + // we want the stable timestamp to advance up to the commit point if all transactions are + // committed or aborted. + auto txnMetrics = ServerTransactionsMetrics::get(getGlobalServiceContext()); + txnMetrics->removeOpTimesLessThanOrEqToCommittedOpTime(commitPoint); + // Compute the current stable optime. auto stableOpTime = _calculateStableOpTime_inlock(_stableOpTimeCandidates, commitPoint); if (stableOpTime) { @@ -3138,7 +3165,6 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_getStableOpTime_inlock() { } void ReplicationCoordinatorImpl::_setStableTimestampForStorage_inlock() { - // Get the current stable optime. auto stableOpTime = _getStableOpTime_inlock(); diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp index 13683f2eee9..0da2179b76e 100644 --- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp +++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp @@ -58,6 +58,7 @@ #include "mongo/db/repl/topology_coordinator.h" #include "mongo/db/repl/update_position_args.h" #include "mongo/db/server_options.h" +#include "mongo/db/server_transactions_metrics.h" #include "mongo/db/service_context.h" #include "mongo/db/write_concern_options.h" #include "mongo/executor/network_interface_mock.h" @@ -3787,6 +3788,8 @@ TEST_F(StableOpTimeTest, CalculateStableOpTime) { initReplSetMode(); auto repl = getReplCoord(); + getStorageInterface()->supportsDocLockingBool = true; + auto txnMetrics = ServerTransactionsMetrics::get(getGlobalServiceContext()); OpTime commitPoint; boost::optional<OpTime> expectedStableOpTime, stableOpTime; std::set<OpTime> stableOpTimeCandidates; @@ -3849,6 +3852,22 @@ TEST_F(StableOpTimeTest, CalculateStableOpTime) { expectedStableOpTime = OpTime({0, 1}, term); stableOpTime = repl->calculateStableOpTime_forTest(stableOpTimeCandidates, commitPoint); ASSERT_EQ(expectedStableOpTime, stableOpTime); + + // Set the oldest oplog entry OpTime for non-majority committed aborts/commits associated + // with a multi-document transaction to be before the current commit point. We will then + // make expectedStableOpTime the new stable optime because it is the only candidate before + // the oldestNonMajCommittedOpTime. + commitPoint = OpTime({1, 5}, term); + const auto oldestNonMajCommittedOpTime = OpTime({1, 3}, term); + const auto finishOpTime = OpTime({1, 3}, term); + stableOpTimeCandidates = {oldestNonMajCommittedOpTime, OpTime({1, 4}, term)}; + + // Adds the oldestNonMajCommittedOpTime to both sets. + txnMetrics->addActiveOpTime(oldestNonMajCommittedOpTime); + // Update the finishOpTime. + txnMetrics->removeActiveOpTime(oldestNonMajCommittedOpTime, finishOpTime); + stableOpTime = repl->calculateStableOpTime_forTest(stableOpTimeCandidates, commitPoint); + ASSERT_EQ(oldestNonMajCommittedOpTime, stableOpTime); } TEST_F(StableOpTimeTest, CleanupStableOpTimeCandidates) { diff --git a/src/mongo/db/server_transactions_metrics.cpp b/src/mongo/db/server_transactions_metrics.cpp index 43d1c9d4c9e..3386bc19c42 100644 --- a/src/mongo/db/server_transactions_metrics.cpp +++ b/src/mongo/db/server_transactions_metrics.cpp @@ -113,34 +113,114 @@ void ServerTransactionsMetrics::incrementTotalCommitted() { _totalCommitted.fetchAndAdd(1); } -boost::optional<Timestamp> ServerTransactionsMetrics::getOldestActiveTS() const { - if (_oldestActiveOplogEntryTS.empty()) { +boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestActiveOpTime() const { + if (_oldestActiveOplogEntryOpTimes.empty()) { return boost::none; } - return *(_oldestActiveOplogEntryTS.begin()); + return *(_oldestActiveOplogEntryOpTimes.begin()); } -void ServerTransactionsMetrics::addActiveTS(Timestamp oldestOplogEntryTS) { - auto ret = _oldestActiveOplogEntryTS.insert(oldestOplogEntryTS); - // If ret.second is false, the timestamp we tried to insert already existed. - invariant(ret.second == true, - str::stream() << "This oplog entry timestamp already exists." - << "TS: " - << oldestOplogEntryTS.toString()); +void ServerTransactionsMetrics::addActiveOpTime(repl::OpTime oldestOplogEntryOpTime) { + auto ret = _oldestActiveOplogEntryOpTimes.insert(oldestOplogEntryOpTime); + // If ret.second is false, the OpTime we tried to insert already existed. + invariant(ret.second, + str::stream() << "This oplog entry OpTime already exists in " + << "oldestActiveOplogEntryOpTimes." + << "oldestOplogEntryOpTime: " + << oldestOplogEntryOpTime.toString()); + + // Add this OpTime to the oldestNonMajorityCommittedOpTimes set with a finishOpTime of + // Timestamp::max() to signify that it has not been committed/aborted. + std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime, + repl::OpTime::max()); + auto ret2 = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime); + // If ret2.second is false, the OpTime we tried to insert already existed. + invariant(ret2.second, + str::stream() << "This oplog entry OpTime already exists in " + << "oldestNonMajorityCommittedOpTimes." + << "oldestOplogEntryOpTime: " + << oldestOplogEntryOpTime.toString()); } -void ServerTransactionsMetrics::removeActiveTS(Timestamp oldestOplogEntryTS) { - auto it = _oldestActiveOplogEntryTS.find(oldestOplogEntryTS); - invariant(it != _oldestActiveOplogEntryTS.end(), - str::stream() << "This oplog entry timestamp does not exist " - << "or has already been removed." - << "TS: " - << oldestOplogEntryTS.toString()); - _oldestActiveOplogEntryTS.erase(it); +void ServerTransactionsMetrics::removeActiveOpTime(repl::OpTime oldestOplogEntryOpTime, + boost::optional<repl::OpTime> finishOpTime) { + auto it = _oldestActiveOplogEntryOpTimes.find(oldestOplogEntryOpTime); + invariant(it != _oldestActiveOplogEntryOpTimes.end(), + str::stream() << "This oplog entry OpTime does not exist in or has already been " + << "removed from oldestActiveOplogEntryOpTimes." + << "OpTime: " + << oldestOplogEntryOpTime.toString()); + _oldestActiveOplogEntryOpTimes.erase(it); + + if (!finishOpTime) { + return; + } + + // The transaction's oldestOplogEntryOpTime now has a corresponding finishTime, which will + // be its commit or abort oplog entry OpTime. Add this pair to the + // oldestNonMajorityCommittedOpTimes. + // Currently, the oldestOplogEntryOpTime will be a prepareOpTime so we will only have a + // finishOpTime if we are committing a prepared transaction or aborting an active prepared + // transaction. + std::pair<repl::OpTime, repl::OpTime> opTimeToRemove(oldestOplogEntryOpTime, + repl::OpTime::max()); + auto it2 = _oldestNonMajorityCommittedOpTimes.find(opTimeToRemove); + invariant(it2 != _oldestNonMajorityCommittedOpTimes.end(), + str::stream() << "This oplog entry OpTime does not exist in or has already been " + << "removed from oldestNonMajorityCommittedOpTimes" + << "oldestOplogEntryOpTime: " + << oldestOplogEntryOpTime.toString()); + _oldestNonMajorityCommittedOpTimes.erase(it2); + + std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime, + *finishOpTime); + auto ret = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime); + // If ret.second is false, the OpTime we tried to insert already existed. + invariant(ret.second, + str::stream() << "This oplog entry OpTime pair already exists in " + << "oldestNonMajorityCommittedOpTimes." + << "oldestOplogEntryOpTime: " + << oldestOplogEntryOpTime.toString() + << "finishOpTime: " + << finishOpTime->toString()); +} + +boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestNonMajorityCommittedOpTime() + const { + if (_oldestNonMajorityCommittedOpTimes.empty()) { + return boost::none; + } + const auto oldestNonMajorityCommittedOpTime = _oldestNonMajorityCommittedOpTimes.begin()->first; + invariant(!oldestNonMajorityCommittedOpTime.isNull()); + return oldestNonMajorityCommittedOpTime; +} + +void ServerTransactionsMetrics::removeOpTimesLessThanOrEqToCommittedOpTime( + repl::OpTime committedOpTime) { + // Iterate through oldestNonMajorityCommittedOpTimes and remove all pairs whose "finishOpTime" + // is now less than or equal to the commit point. + for (auto it = _oldestNonMajorityCommittedOpTimes.begin(); + it != _oldestNonMajorityCommittedOpTimes.end();) { + if (it->second <= committedOpTime) { + it = _oldestNonMajorityCommittedOpTimes.erase(it); + } else { + ++it; + } + } +} + +boost::optional<repl::OpTime> +ServerTransactionsMetrics::getFinishOpTimeOfOldestNonMajCommitted_forTest() const { + if (_oldestNonMajorityCommittedOpTimes.empty()) { + return boost::none; + } + const auto oldestNonMajorityCommittedOpTime = + _oldestNonMajorityCommittedOpTimes.begin()->second; + return oldestNonMajorityCommittedOpTime; } -unsigned int ServerTransactionsMetrics::getTotalActiveTS() const { - return _oldestActiveOplogEntryTS.size(); +unsigned int ServerTransactionsMetrics::getTotalActiveOpTimes() const { + return _oldestActiveOplogEntryOpTimes.size(); } void ServerTransactionsMetrics::updateStats(TransactionsStats* stats) { diff --git a/src/mongo/db/server_transactions_metrics.h b/src/mongo/db/server_transactions_metrics.h index f82dfb179e6..7dbc208e0c6 100644 --- a/src/mongo/db/server_transactions_metrics.h +++ b/src/mongo/db/server_transactions_metrics.h @@ -32,8 +32,8 @@ #include <set> -#include "mongo/bson/timestamp.h" #include "mongo/db/operation_context.h" +#include "mongo/db/repl/optime.h" #include "mongo/db/service_context.h" #include "mongo/db/transactions_stats_gen.h" @@ -73,26 +73,56 @@ public: void incrementTotalCommitted(); /** - * Returns the Timestamp of the oldest oplog entry written across all open transactions. - * Returns boost::none if there are no transaction oplog entry Timestamps stored. + * Returns the OpTime of the oldest oplog entry written across all open transactions. + * Returns boost::none if there are no transaction oplog entry OpTimes stored. */ - boost::optional<Timestamp> getOldestActiveTS() const; + boost::optional<repl::OpTime> getOldestActiveOpTime() const; /** - * Add the transaction's oplog entry Timestamp to a set of Timestamps. + * Add the transaction's oplog entry OpTime to oldestActiveOplogEntryOpTimes, a set of OpTimes. + * Also creates a pair with this OpTime and OpTime::max() as the corresponding commit/abort + * oplog entry OpTime. Finally, adds this to oldestNonMajorityCommittedOpTimes. */ - void addActiveTS(Timestamp oldestOplogEntryTS); + void addActiveOpTime(repl::OpTime oldestOplogEntryOpTime); /** - * Remove the corresponding transaction oplog entry Timestamp if the transaction commits or - * aborts. + * Remove the corresponding transaction oplog entry OpTime if the transaction commits or + * aborts. Also updates the pair in oldestNonMajorityCommittedOpTimes with the + * oldestOplogEntryOpTime to have a valid finishOpTime instead of OpTime::max(). It's stored in + * the format: < oldestOplogEntryOpTime, finishOpTime >. */ - void removeActiveTS(Timestamp oldestOplogEntryTS); + void removeActiveOpTime(repl::OpTime oldestOplogEntryOpTime, + boost::optional<repl::OpTime> finishOpTime); /** - * Returns the number of transaction oplog entry Timestamps currently stored. + * Returns the number of transaction oplog entry OpTimes currently stored. */ - unsigned int getTotalActiveTS() const; + unsigned int getTotalActiveOpTimes() const; + + /** + * Returns the oldest oplog entry OpTime across transactions whose corresponding commit or + * abort oplog entry has not been majority committed. + */ + boost::optional<repl::OpTime> getOldestNonMajorityCommittedOpTime() const; + + /** + * Remove the corresponding transaction oplog entry OpTime pair from + * oldestNonMajorityCommittedOpTimes if the transaction is majority committed or aborted. + * We determine this by checking if there are any pairs in the set whose + * 'finishOpTime' <= 'committedOpTime'. + */ + void removeOpTimesLessThanOrEqToCommittedOpTime(repl::OpTime committedOpTime); + + /** + * Testing function that adds an OpTime pair to oldestNonMajorityCommittedOpTimes. + */ + void addNonMajCommittedOpTimePair_forTest(std::pair<repl::OpTime, repl::OpTime> OpTimePair); + + /** + * Testing function that returns the oldest non-majority committed OpTime pair in the form: + * < oldestOplogEntryOpTime, finishOpTime >. + */ + boost::optional<repl::OpTime> getFinishOpTimeOfOldestNonMajCommitted_forTest() const; /** * Appends the accumulated stats to a transactions stats object. @@ -118,10 +148,21 @@ private: // The total number of multi-document transaction commits. AtomicUInt64 _totalCommitted{0}; - // Maintain the oldest oplog entry Timestamp across all active transactions. Currently, we only + // Maintain the oldest oplog entry OpTime across all active transactions. Currently, we only // write an oplog entry for an ongoing transaction if it is in the `prepare` state. By - // maintaining an ordered set of timestamps, the timestamp at the beginning will be the oldest. - std::set<Timestamp> _oldestActiveOplogEntryTS; + // maintaining an ordered set of OpTimes, the OpTime at the beginning will be the oldest. + std::set<repl::OpTime> _oldestActiveOplogEntryOpTimes; + + // Maintain the oldest oplog entry OpTime across transactions whose corresponding abort/commit + // oplog entries have not been majority committed. Since this is an ordered set, the first + // pair's oldestOplogEntryOpTime represents the earliest OpTime that we should pin the stable + // timestamp behind. + // Each pair is structured as follows: <oldestOplogEntryOpTime, finishOpTime> + // 'oldestOplogEntryOpTime': The first oplog entry OpTime written by a transaction. + // 'finishOpTime': The commit/abort oplog entry OpTime. + // Once the corresponding abort/commit entry has been majority committed, remove the pair from + // this set. + std::set<std::pair<repl::OpTime, repl::OpTime>> _oldestNonMajorityCommittedOpTimes; }; } // namespace mongo diff --git a/src/mongo/db/transaction_metrics_observer.cpp b/src/mongo/db/transaction_metrics_observer.cpp index 10109763fbc..eea17ad5379 100644 --- a/src/mongo/db/transaction_metrics_observer.cpp +++ b/src/mongo/db/transaction_metrics_observer.cpp @@ -98,8 +98,11 @@ void TransactionMetricsObserver::onUnstash(ServerTransactionsMetrics* serverTran void TransactionMetricsObserver::onCommit(ServerTransactionsMetrics* serverTransactionsMetrics, TickSource* tickSource, - boost::optional<Timestamp> oldestOplogEntryTS, + boost::optional<repl::OpTime> oldestOplogEntryOpTime, + boost::optional<repl::OpTime> commitOpTime, Top* top) { + invariant((oldestOplogEntryOpTime != boost::none && commitOpTime != boost::none) || + (oldestOplogEntryOpTime == boost::none && commitOpTime == boost::none)); // // Per transaction metrics. // @@ -124,16 +127,20 @@ void TransactionMetricsObserver::onCommit(ServerTransactionsMetrics* serverTrans durationCount<Microseconds>(_singleTransactionStats.getDuration(tickSource, curTick)); top->incrementGlobalTransactionLatencyStats(static_cast<uint64_t>(duration)); - // Remove this transaction's oldest oplog entry Timestamp if one was written. - if (oldestOplogEntryTS) { - serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS); + // Remove this transaction's oldest oplog entry OpTime if one was written. + if (oldestOplogEntryOpTime) { + serverTransactionsMetrics->removeActiveOpTime(*oldestOplogEntryOpTime, commitOpTime); } } void TransactionMetricsObserver::onAbortActive(ServerTransactionsMetrics* serverTransactionsMetrics, TickSource* tickSource, - boost::optional<Timestamp> oldestOplogEntryTS, + boost::optional<repl::OpTime> oldestOplogEntryOpTime, + boost::optional<repl::OpTime> abortOpTime, Top* top) { + invariant((oldestOplogEntryOpTime != boost::none && abortOpTime != boost::none) || + (oldestOplogEntryOpTime == boost::none && abortOpTime == boost::none)); + auto curTick = tickSource->getTicks(); _onAbort(serverTransactionsMetrics, curTick, tickSource, top); // @@ -150,16 +157,16 @@ void TransactionMetricsObserver::onAbortActive(ServerTransactionsMetrics* server // serverTransactionsMetrics->decrementCurrentActive(); - // Remove this transaction's oldest oplog entry Timestamp if one was written. - if (oldestOplogEntryTS) { - serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS); + // Remove this transaction's oldest oplog entry OpTime if one was written. + if (oldestOplogEntryOpTime) { + serverTransactionsMetrics->removeActiveOpTime(*oldestOplogEntryOpTime, abortOpTime); } } void TransactionMetricsObserver::onAbortInactive( ServerTransactionsMetrics* serverTransactionsMetrics, TickSource* tickSource, - boost::optional<Timestamp> oldestOplogEntryTS, + boost::optional<repl::OpTime> oldestOplogEntryOpTime, Top* top) { auto curTick = tickSource->getTicks(); _onAbort(serverTransactionsMetrics, curTick, tickSource, top); @@ -169,9 +176,9 @@ void TransactionMetricsObserver::onAbortInactive( // serverTransactionsMetrics->decrementCurrentInactive(); - // Remove this transaction's oldest oplog entry Timestamp if one was written. - if (oldestOplogEntryTS) { - serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS); + // Remove this transaction's oldest oplog entry OpTime if one was written. + if (oldestOplogEntryOpTime) { + serverTransactionsMetrics->removeActiveOpTime(*oldestOplogEntryOpTime, boost::none); } } @@ -208,11 +215,11 @@ void TransactionMetricsObserver::_onAbort(ServerTransactionsMetrics* serverTrans } void TransactionMetricsObserver::onPrepare(ServerTransactionsMetrics* serverTransactionsMetrics, - Timestamp prepareTimestamp) { + repl::OpTime prepareOpTime) { // Since we currently only write an oplog entry for an in progress transaction when it is in - // the prepare state, the prepareTimestamp is currently the oldest timestamp written to the + // the prepare state, the prepareOpTime is currently the oldest OpTime written to the // oplog for this transaction. - serverTransactionsMetrics->addActiveTS(prepareTimestamp); + serverTransactionsMetrics->addActiveOpTime(prepareOpTime); } } // namespace mongo diff --git a/src/mongo/db/transaction_metrics_observer.h b/src/mongo/db/transaction_metrics_observer.h index d1a3bba4244..715aa14244f 100644 --- a/src/mongo/db/transaction_metrics_observer.h +++ b/src/mongo/db/transaction_metrics_observer.h @@ -70,37 +70,46 @@ public: /** * Updates relevant metrics when a transaction commits. Also removes this transaction's oldest - * oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none. + * oplog entry OpTime from the oldestActiveOplogEntryOpTimes set if it is not boost::none. + * Finally, updates an entry in oldestNonMajorityCommittedOpTimes to include its commit OpTime. */ void onCommit(ServerTransactionsMetrics* serverTransactionsMetrics, TickSource* tickSource, - boost::optional<Timestamp> oldestOplogEntryTS, + boost::optional<repl::OpTime> oldestOplogEntryOpTime, + boost::optional<repl::OpTime> commitOpTime, Top* top); /** * Updates relevant metrics when an active transaction aborts. Also removes this transaction's - * oldest oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none. + * oldest oplog entry OpTime from the oldestActiveOplogEntryOpTimes set if it is not + * boost::none. + * Finally, updates an entry in oldestNonMajorityCommittedOpTimes to include its abort OpTime. */ void onAbortActive(ServerTransactionsMetrics* serverTransactionsMetrics, TickSource* tickSource, - boost::optional<Timestamp> oldestOplogEntryTS, + boost::optional<repl::OpTime> oldestOplogEntryOpTime, + boost::optional<repl::OpTime> abortOpTime, Top* top); /** * Updates relevant metrics when an inactive transaction aborts. Also removes this transaction's - * oldest oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none. + * oldest oplog entry OpTime from the oldestActiveOplogEntryOpTimes set if it is not + * boost::none. + * Does not accept an optional abortOpTime parameter because we cannot abort an inactive + * prepared transaction. Instead, uses boost::none as the abortOpTime, which subsequently will + * not modify oldestNonMajorityCommittedOpTimes. */ void onAbortInactive(ServerTransactionsMetrics* serverTransactionsMetrics, TickSource* tickSource, - boost::optional<Timestamp> oldestOplogEntryTS, + boost::optional<repl::OpTime> oldestOplogEntryOpTime, Top* top); /** - * Adds the prepareTimestamp, which is currently the Timestamp of the first oplog entry written - * by an active transaction, to the oldestActiveOplogEntryTS set. + * Adds the prepareOpTime, which is currently the Timestamp of the first oplog entry written by + * an active transaction, to the oldestActiveOplogEntryTS set. */ void onPrepare(ServerTransactionsMetrics* serverTransactionsMetrics, - Timestamp prepareTimestamp); + repl::OpTime prepareOpTime); /** * Updates relevant metrics when an operation running on the transaction completes. An operation diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp index 0ca9a2367d9..95cbf2dd09b 100644 --- a/src/mongo/db/transaction_participant.cpp +++ b/src/mongo/db/transaction_participant.cpp @@ -823,20 +823,20 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx, abortGuard.Dismiss(); - invariant(!_oldestOplogEntryTS, + invariant(!_oldestOplogEntryOpTime, str::stream() << "This transaction's oldest oplog entry Timestamp has already " << "been set to: " - << _oldestOplogEntryTS->toString()); + << _oldestOplogEntryOpTime->toString()); // Keep track of the Timestamp from the first oplog entry written by this transaction. - _oldestOplogEntryTS = prepareOplogSlot.opTime.getTimestamp(); + _oldestOplogEntryOpTime = prepareOplogSlot.opTime; - // Maintain the Timestamp of the oldest active oplog entry for this transaction. We currently + // Maintain the OpTime of the oldest active oplog entry for this transaction. We currently // only write an oplog entry for an in progress transaction when it is in the prepare state // but this will change when we allow multiple oplog entries per transaction. { stdx::lock_guard<stdx::mutex> lm(_metricsMutex); _transactionMetricsObserver.onPrepare(ServerTransactionsMetrics::get(opCtx), - *_oldestOplogEntryTS); + *_oldestOplogEntryOpTime); } return prepareOplogSlot.opTime.getTimestamp(); @@ -895,10 +895,10 @@ void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx !_txnState.isPrepared(lk)); // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB. - invariant(!_oldestOplogEntryTS, + invariant(!_oldestOplogEntryOpTime, str::stream() << "The oldest oplog entry Timestamp should not have been set because " << "this transaction is not prepared. But, it is currently " - << _oldestOplogEntryTS->toString()); + << _oldestOplogEntryOpTime->toString()); // We need to unlock the session to run the opObserver onTransactionCommit, which calls back // into the session. @@ -967,6 +967,11 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx, lk.lock(); _checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true); + // If we are committing a prepared transaction, then we must have already recorded this + // transaction's oldest oplog entry optime. + invariant(_oldestOplogEntryOpTime); + _finishOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + _finishCommitTransaction(lk, opCtx); } catch (...) { // It is illegal for committing a prepared transaction to fail for any reason, other than an @@ -1015,7 +1020,8 @@ void TransactionParticipant::_finishCommitTransaction(WithLock lk, OperationCont auto tickSource = opCtx->getServiceContext()->getTickSource(); _transactionMetricsObserver.onCommit(ServerTransactionsMetrics::get(opCtx), tickSource, - _oldestOplogEntryTS, + _oldestOplogEntryOpTime, + _finishOpTime, &Top::get(getGlobalServiceContext())); _transactionMetricsObserver.onTransactionOperation( opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics); @@ -1097,10 +1103,10 @@ void TransactionParticipant::abortActiveUnpreparedOrStashPreparedTransaction( } // TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB. - invariant(!_oldestOplogEntryTS, + invariant(!_oldestOplogEntryOpTime, str::stream() << "The oldest oplog entry Timestamp should not have been set because " << "this transaction is not prepared. But, it is currently " - << _oldestOplogEntryTS->toString()); + << _oldestOplogEntryOpTime->toString()); _abortActiveTransaction(std::move(lock), opCtx, TransactionState::kInProgress); } catch (...) { @@ -1149,6 +1155,12 @@ void TransactionParticipant::_abortActiveTransaction(stdx::unique_lock<stdx::mut lock.lock(); // We do not check if the active transaction number is correct here because we handle it below. + // Set the finishOpTime of this transaction if we have recorded this transaction's oldest oplog + // entry optime. + if (_oldestOplogEntryOpTime) { + _finishOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp(); + } + // Only abort the transaction in session if it's in expected states. // When the state of active transaction on session is not expected, it means another // thread has already aborted the transaction on session. @@ -1185,7 +1197,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _transactionMetricsObserver.onAbortInactive( ServerTransactionsMetrics::get(getGlobalServiceContext()), tickSource, - _oldestOplogEntryTS, + _oldestOplogEntryOpTime, &Top::get(getGlobalServiceContext())); } _logSlowTransaction(wl, @@ -1198,7 +1210,8 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _transactionMetricsObserver.onAbortActive( ServerTransactionsMetrics::get(getGlobalServiceContext()), tickSource, - _oldestOplogEntryTS, + _oldestOplogEntryOpTime, + _finishOpTime, &Top::get(getGlobalServiceContext())); } @@ -1206,7 +1219,8 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) { _transactionOperations.clear(); _txnState.transitionTo(wl, TransactionState::kAborted); _prepareOpTime = repl::OpTime(); - _oldestOplogEntryTS = boost::none; + _oldestOplogEntryOpTime = boost::none; + _finishOpTime = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); } @@ -1537,7 +1551,8 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN // Reset the transactional state _txnState.transitionTo(wl, TransactionState::kNone); _prepareOpTime = repl::OpTime(); - _oldestOplogEntryTS = boost::none; + _oldestOplogEntryOpTime = boost::none; + _finishOpTime = boost::none; _speculativeTransactionReadOpTime = repl::OpTime(); _multikeyPathInfo.clear(); _autoCommit = boost::none; diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h index 451f502b5bb..a9830833c17 100644 --- a/src/mongo/db/transaction_participant.h +++ b/src/mongo/db/transaction_participant.h @@ -800,8 +800,11 @@ private: // Tracks and updates transaction metrics upon the appropriate transaction event. TransactionMetricsObserver _transactionMetricsObserver; - // Tracks the Timestamp of the first oplog entry written by this TransactionParticipant. - boost::optional<Timestamp> _oldestOplogEntryTS; + // Tracks the OpTime of the first oplog entry written by this TransactionParticipant. + boost::optional<repl::OpTime> _oldestOplogEntryOpTime; + + // Tracks the OpTime of the abort/commit oplog entry associated with this transaction. + boost::optional<repl::OpTime> _finishOpTime; }; } // namespace mongo diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp index 11c35d5f597..70c5fe92ce2 100644 --- a/src/mongo/db/transaction_participant_test.cpp +++ b/src/mongo/db/transaction_participant_test.cpp @@ -41,6 +41,7 @@ #include "mongo/db/repl/oplog.h" #include "mongo/db/repl/oplog_entry.h" #include "mongo/db/repl/optime.h" +#include "mongo/db/repl/repl_client_info.h" #include "mongo/db/server_transactions_metrics.h" #include "mongo/db/service_context.h" #include "mongo/db/session_catalog_mongod.h" @@ -882,7 +883,8 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPrepareDoesNotAbortTransaction) { auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); // Check that the oldest prepareTimestamp is the one we just set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); + auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); + ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp); ASSERT(_opObserver->transactionPrepared); ASSERT_FALSE(txnParticipant->transactionIsAborted()); } @@ -1078,7 +1080,8 @@ TEST_F(TxnParticipantTest, KillSessionsDoesNotAbortPreparedTransactions) { auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); // Check that the oldest prepareTimestamp is the one we just set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); + auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); + ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); txnParticipant->abortArbitraryTransaction(); @@ -1104,7 +1107,8 @@ TEST_F(TxnParticipantTest, TransactionTimeoutDoesNotAbortPreparedTransactions) { auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); // Check that the oldest prepareTimestamp is the one we just set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); + auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); + ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); txnParticipant->abortArbitraryTransactionIfExpired(); @@ -1132,7 +1136,8 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp); // Check that the oldest prepareTimestamp is the one we just set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); + auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); + ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp); txnParticipant->stashTransactionResources(opCtx()); @@ -1223,7 +1228,8 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); // Check that the oldest prepareTimestamp is the one we just set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp); + auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); + ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp); auto sessionId = *opCtx()->getLogicalSessionId(); auto txnNum = *opCtx()->getTxnNumber(); @@ -1242,7 +1248,7 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti txnParticipant->commitPreparedTransaction(opCtx(), prepareTimestamp); // Check that we removed the prepareTimestamp from the set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(), boost::none); } TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) { @@ -2975,23 +2981,20 @@ TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowStashedAbort) { } TEST_F(TxnParticipantTest, WhenOldestTSRemovedNextOldestBecomesNewOldest) { - auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); OperationContextSessionMongod opCtxSession(opCtx(), true, makeSessionInfo()); auto txnParticipant = TransactionParticipant::get(opCtx()); // Check that there are no Timestamps in the set. - unsigned int zero = 0; - ASSERT_EQ(totalActiveTxnTS, zero); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U); txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); auto firstPrepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); // Check that we added a Timestamp to the set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1); - // totalActiveTxnTS = 1 - totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U); // Check that the oldest prepareTimestamp is equal to firstPrepareTimestamp because there is // only one prepared transaction on this Service. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), firstPrepareTimestamp); + auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); + ASSERT_EQ(prepareOpTime->getTimestamp(), firstPrepareTimestamp); ASSERT_FALSE(txnParticipant->transactionIsAborted()); txnParticipant->stashTransactionResources(opCtx()); @@ -3023,13 +3026,10 @@ TEST_F(TxnParticipantTest, WhenOldestTSRemovedNextOldestBecomesNewOldest) { secondPrepareTimestamp = newTxnParticipant->prepareTransaction(newOpCtx.get(), {}); ASSERT_GT(secondPrepareTimestamp, firstPrepareTimestamp); // Check that we added a Timestamp to the set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), - totalActiveTxnTS + 1); - // totalActiveTxnTS = 2 - totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 2U); // The oldest prepareTimestamp should still be firstPrepareTimestamp. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), - firstPrepareTimestamp); + prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); + ASSERT_EQ(prepareOpTime->getTimestamp(), firstPrepareTimestamp); ASSERT_FALSE(txnParticipant->transactionIsAborted()); } @@ -3041,25 +3041,22 @@ TEST_F(TxnParticipantTest, WhenOldestTSRemovedNextOldestBecomesNewOldest) { txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); txnParticipant->abortActiveTransaction(opCtx()); ASSERT(txnParticipant->transactionIsAborted()); - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1); - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), secondPrepareTimestamp); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U); + prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); + ASSERT_EQ(prepareOpTime->getTimestamp(), secondPrepareTimestamp); } TEST_F(TxnParticipantTest, ReturnNullTimestampIfNoOldestActiveTimestamp) { - auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); OperationContextSessionMongod opCtxSession(opCtx(), true, makeSessionInfo()); auto txnParticipant = TransactionParticipant::get(opCtx()); // Check that there are no Timestamps in the set. - unsigned int zero = 0; - ASSERT_EQ(totalActiveTxnTS, zero); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U); txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); txnParticipant->prepareTransaction(opCtx(), {}); // Check that we added a Timestamp to the set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1); - // totalActiveTxnTS = 1 - totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U); ASSERT_FALSE(txnParticipant->transactionIsAborted()); txnParticipant->stashTransactionResources(opCtx()); @@ -3089,32 +3086,146 @@ TEST_F(TxnParticipantTest, ReturnNullTimestampIfNoOldestActiveTimestamp) { // transaction was prepared after. newTxnParticipant->prepareTransaction(newOpCtx.get(), {}); // Check that we added a Timestamp to the set. - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), - totalActiveTxnTS + 1); - // totalActiveTxnTS = 2 - totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 2U); // The oldest prepareTimestamp should still be firstPrepareTimestamp. ASSERT_FALSE(txnParticipant->transactionIsAborted()); // Abort this transaction and check that we have decremented the total active timestamps // count. newTxnParticipant->abortActiveTransaction(newOpCtx.get()); - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), - totalActiveTxnTS - 1); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U); } Client::releaseCurrent(); Client::setCurrent(std::move(originalClient)); - // totalActiveTxnTS = 1 - totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(); // Switch clients and abort the first transaction. This means we no longer have an oldest active // timestamp. txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); txnParticipant->abortActiveTransaction(opCtx()); ASSERT(txnParticipant->transactionIsAborted()); - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1); - ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(), boost::none); +} + +TEST_F(TxnParticipantTest, ProperlyMaintainOldestNonMajorityCommittedOpTimeSet) { + OperationContextSessionMongod opCtxSession(opCtx(), true, makeSessionInfo()); + auto txnParticipant = TransactionParticipant::get(opCtx()); + + // Check that there are no Timestamps in the set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U); + + txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction"); + auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {}); + // Check that we added a Timestamp to the set. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U); + + // Check that the oldest prepareTimestamp is equal to first prepareTimestamp because there is + // only one prepared transaction on this Service. + auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(); + ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp); + + // Check that oldestNonMajorityCommittedOpTimes also has this prepareTimestamp and that the + // pair's finishOpTime is Timestamp::max() because this transaction has not been + // committed/aborted. + auto nonMajorityCommittedOpTime = + ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime(); + ASSERT_EQ(nonMajorityCommittedOpTime->getTimestamp(), prepareTimestamp); + auto nonMajorityCommittedOpTimeFinishOpTime = + ServerTransactionsMetrics::get(opCtx())->getFinishOpTimeOfOldestNonMajCommitted_forTest(); + ASSERT_EQ(nonMajorityCommittedOpTimeFinishOpTime->getTimestamp(), Timestamp::max()); + + ASSERT_FALSE(txnParticipant->transactionIsAborted()); + // Since this test uses a mock opObserver, we have to manually set the finishTimestamp on the + // txnParticipant. + auto finishOpTime = repl::OpTime({10, 10}, 0); + repl::ReplClientInfo::forClient(opCtx()->getClient()).setLastOp(finishOpTime); + + txnParticipant->abortActiveTransaction(opCtx()); + ASSERT(txnParticipant->transactionIsAborted()); + + // Make sure that we moved the OpTime from the oldestActiveOplogEntryOpTimes to + // oldestNonMajorityCommittedOpTimes along with the abort/commit oplog entry OpTime + // associated with the transaction. + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U); + ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(), boost::none); + + nonMajorityCommittedOpTime = + ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime(); + nonMajorityCommittedOpTimeFinishOpTime = + ServerTransactionsMetrics::get(opCtx())->getFinishOpTimeOfOldestNonMajCommitted_forTest(); + ASSERT_FALSE(nonMajorityCommittedOpTime == boost::none); + ASSERT_FALSE(nonMajorityCommittedOpTimeFinishOpTime == boost::none); + ASSERT_EQ(nonMajorityCommittedOpTime->getTimestamp(), prepareTimestamp); + ASSERT_EQ(nonMajorityCommittedOpTimeFinishOpTime, finishOpTime); + + // If we pass in a mock commit point that is greater than the finish timestamp of the + // oldestNonMajorityCommittedOpTime, it should be removed from the set. This would mean that + // the abort/commit oplog entry is majority committed. + ServerTransactionsMetrics::get(opCtx())->removeOpTimesLessThanOrEqToCommittedOpTime( + repl::OpTime::max()); + nonMajorityCommittedOpTime = + ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime(); + ASSERT_EQ(nonMajorityCommittedOpTime, boost::none); +} + +TEST_F(TxnParticipantTest, GetOldestNonMajorityCommittedOpTimeReturnsOldestEntry) { + const auto earlierOpTime = repl::OpTime({1, 1}, 0); + const auto earlierFinishOpTime = repl::OpTime({3, 2}, 0); + + const auto middleOpTime = repl::OpTime({1, 2}, 0); + const auto middleFinishOpTime = repl::OpTime({3, 3}, 0); + + const auto laterOpTime = repl::OpTime({1, 3}, 0); + const auto laterFinishOpTime = repl::OpTime({3, 4}, 0); + + ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(earlierOpTime); + ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(earlierOpTime, earlierFinishOpTime); + + ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(middleOpTime); + ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(middleOpTime, middleFinishOpTime); + + ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(laterOpTime); + ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(laterOpTime, laterFinishOpTime); + + auto nonMajorityCommittedOpTime = + ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime(); + + ASSERT_EQ(*nonMajorityCommittedOpTime, repl::OpTime({1, 1}, 0)); + + // If we pass in a mock commit point that is greater than the finish timestamp of the + // oldestNonMajorityCommittedOpTime, it should be removed from the set. This would mean that + // the abort/commit oplog entry is majority committed. + ServerTransactionsMetrics::get(opCtx())->removeOpTimesLessThanOrEqToCommittedOpTime( + repl::OpTime::max()); + nonMajorityCommittedOpTime = + ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime(); + ASSERT_EQ(nonMajorityCommittedOpTime, boost::none); + + // Test that we can remove only a part of the set by passing in a commit point that is only + // greater than or equal to two of the optimes. + ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(earlierOpTime); + ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(earlierOpTime, earlierFinishOpTime); + + ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(middleOpTime); + ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(middleOpTime, middleFinishOpTime); + + ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(laterOpTime); + ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(laterOpTime, laterFinishOpTime); + + nonMajorityCommittedOpTime = + ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime(); + + ASSERT_EQ(*nonMajorityCommittedOpTime, earlierOpTime); + + ServerTransactionsMetrics::get(opCtx())->removeOpTimesLessThanOrEqToCommittedOpTime( + repl::OpTime({3, 3}, 0)); + nonMajorityCommittedOpTime = + ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime(); + + // earlierOpTime and middleOpTime must have been removed because their finishOpTime are less + // than or equal to the mock commit point. + ASSERT_EQ(nonMajorityCommittedOpTime, laterOpTime); } |