summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--jstests/replsets/commit_transaction_recovery.js3
-rw-r--r--jstests/replsets/commit_transaction_recovery_data_already_applied.js3
-rw-r--r--jstests/replsets/prepare_conflict_read_concern_behavior.js28
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl.cpp30
-rw-r--r--src/mongo/db/repl/replication_coordinator_impl_test.cpp19
-rw-r--r--src/mongo/db/server_transactions_metrics.cpp120
-rw-r--r--src/mongo/db/server_transactions_metrics.h69
-rw-r--r--src/mongo/db/transaction_metrics_observer.cpp37
-rw-r--r--src/mongo/db/transaction_metrics_observer.h27
-rw-r--r--src/mongo/db/transaction_participant.cpp43
-rw-r--r--src/mongo/db/transaction_participant.h7
-rw-r--r--src/mongo/db/transaction_participant_test.cpp185
12 files changed, 446 insertions, 125 deletions
diff --git a/jstests/replsets/commit_transaction_recovery.js b/jstests/replsets/commit_transaction_recovery.js
index 1f40a13938a..00b163b8dcc 100644
--- a/jstests/replsets/commit_transaction_recovery.js
+++ b/jstests/replsets/commit_transaction_recovery.js
@@ -48,7 +48,8 @@
// Perform a clean shutdown and restart. Note that the 'disableSnapshotting' failpoint will be
// unset on the node following the restart.
- replTest.restart(primary);
+ replTest.stop(primary, undefined, {skipValidation: true});
+ replTest.start(primary, {}, true);
jsTestLog("Node was restarted");
diff --git a/jstests/replsets/commit_transaction_recovery_data_already_applied.js b/jstests/replsets/commit_transaction_recovery_data_already_applied.js
index d5fc83946db..5ca3326dd38 100644
--- a/jstests/replsets/commit_transaction_recovery_data_already_applied.js
+++ b/jstests/replsets/commit_transaction_recovery_data_already_applied.js
@@ -68,7 +68,8 @@
// will error with BSONTooLarge only if recovery reapplies the operations from the transaction.
// Note that the 'disableSnapshotting' failpoint will be unset on the node following the
// restart.
- replTest.restart(primary);
+ replTest.stop(primary, undefined, {skipValidation: true});
+ replTest.start(primary, {}, true);
jsTestLog("Node was restarted");
diff --git a/jstests/replsets/prepare_conflict_read_concern_behavior.js b/jstests/replsets/prepare_conflict_read_concern_behavior.js
index 3828dea4426..6106248f52b 100644
--- a/jstests/replsets/prepare_conflict_read_concern_behavior.js
+++ b/jstests/replsets/prepare_conflict_read_concern_behavior.js
@@ -66,11 +66,13 @@
.operationTime;
const prepareTimestamp = PrepareHelpers.prepareTransaction(session);
+ // TODO: Once we no longer hold the stable optime behind the earliest prepare optime
+ // whose corresponding commit/abort oplog entry optime is not majority committed, allow
+ // this insert to be majority committed.
const clusterTimeAfterPrepare =
assert
- .commandWorked(testColl.runCommand(
- "insert",
- {documents: [{_id: 3, in_prepared_txn: 3}], writeConcern: {w: "majority"}}))
+ .commandWorked(
+ testColl.runCommand("insert", {documents: [{_id: 3, in_prepared_txn: 3}]}))
.operationTime;
jsTestLog("prepareTimestamp: " + prepareTimestamp + " clusterTimeBeforePrepare: " +
@@ -80,9 +82,12 @@
assert.gt(prepareTimestamp, clusterTimeBeforePrepare);
assert.gt(clusterTimeAfterPrepare, prepareTimestamp);
- jsTestLog(
- "Test read with read concern 'majority' doesn't block on a prepared transaction.");
- assert.commandWorked(read({level: 'majority'}, successTimeout, testDB, collName, 2));
+ // TODO: Once we no longer hold the stable optime behind the earliest prepare optime
+ // whose corresponding commit/abort oplog entry optime is not majority committed, uncomment
+ // this read.
+ // jsTestLog(
+ // "Test read with read concern 'majority' doesn't block on a prepared transaction.");
+ // assert.commandWorked(read({level: 'majority'}, successTimeout, testDB, collName, 2));
jsTestLog("Test read with read concern 'local' doesn't block on a prepared transaction.");
assert.commandWorked(read({level: 'local'}, successTimeout, testDB, collName, 2));
@@ -131,10 +136,13 @@
session2.startTransaction(
{readConcern: {level: "snapshot", atClusterTime: clusterTimeAfterPrepare}});
- jsTestLog("Test read with read concern 'snapshot' and a read timestamp after " +
- "prepareTimestamp on non-prepared documents doesn't block on a prepared " +
- "transaction.");
- assert.commandWorked(read({}, failureTimeout, sessionDB2, collName2, 1));
+ // TODO: Once we no longer hold the stable optime behind the earliest prepare optime
+ // whose corresponding commit/abort oplog entry optime is not majority committed, uncomment
+ // this read.
+ // jsTestLog("Test read with read concern 'snapshot' and a read timestamp after " +
+ // "prepareTimestamp on non-prepared documents doesn't block on a prepared " +
+ // "transaction.");
+ // assert.commandWorked(read({}, failureTimeout, sessionDB2, collName2, 1));
jsTestLog("Test read with read concern 'snapshot' and a read timestamp after " +
"prepareTimestamp blocks on a prepared transaction.");
diff --git a/src/mongo/db/repl/replication_coordinator_impl.cpp b/src/mongo/db/repl/replication_coordinator_impl.cpp
index e036ddbb975..77ebaefdfa3 100644
--- a/src/mongo/db/repl/replication_coordinator_impl.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl.cpp
@@ -72,6 +72,7 @@
#include "mongo/db/repl/vote_requester.h"
#include "mongo/db/server_options.h"
#include "mongo/db/server_parameters.h"
+#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/transaction_participant.h"
#include "mongo/db/write_concern.h"
#include "mongo/db/write_concern_options.h"
@@ -3068,7 +3069,23 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_calculateStableOpTime_inloc
maximumStableTimestamp = std::min(maximumStableTimestamp, holdStableTimestamp);
}
- const auto maximumStableOpTime = OpTime(maximumStableTimestamp, commitPoint.getTerm());
+ auto maximumStableOpTime = OpTime(maximumStableTimestamp, commitPoint.getTerm());
+
+ // When calculating the stable optime, compare it to the oldest oplog entry timestamp across
+ // transactions whose corresponding commit/abort oplog entries have not been majority committed.
+ const auto serverTxnMetrics = ServerTransactionsMetrics::get(getGlobalServiceContext());
+ const auto oldestNonMajCommittedOpTime =
+ serverTxnMetrics->getOldestNonMajorityCommittedOpTime();
+
+ if (oldestNonMajCommittedOpTime) {
+ if (oldestNonMajCommittedOpTime->getTimestamp() < maximumStableTimestamp) {
+ // If there is an oldest non-majority committed timestamp that is less than the current
+ // max stable timestamp, then update the max stable timestamp/optime accordingly.
+ maximumStableTimestamp = oldestNonMajCommittedOpTime->getTimestamp();
+ maximumStableOpTime =
+ OpTime(maximumStableTimestamp, oldestNonMajCommittedOpTime->getTerm());
+ }
+ }
// Find the greatest optime candidate that is less than or equal to the commit point.
// To do this we first find the upper bound of 'commitPoint', which points to the smallest
@@ -3126,6 +3143,16 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_getStableOpTime_inlock() {
invariant(snapshotOpTime <= commitPoint);
}
+ // If we advanced the commit point and have prepared transactions, check if their commit or
+ // abort timestamps are <= the commit point. If so, remove them from our oldest non-majority
+ // committed optimes set because we know that the commit/abort oplog entries are majority
+ // committed.
+ // We must remove these optimes before calling _calculateStableOpTime_inlock because we want
+ // we want the stable timestamp to advance up to the commit point if all transactions are
+ // committed or aborted.
+ auto txnMetrics = ServerTransactionsMetrics::get(getGlobalServiceContext());
+ txnMetrics->removeOpTimesLessThanOrEqToCommittedOpTime(commitPoint);
+
// Compute the current stable optime.
auto stableOpTime = _calculateStableOpTime_inlock(_stableOpTimeCandidates, commitPoint);
if (stableOpTime) {
@@ -3138,7 +3165,6 @@ boost::optional<OpTime> ReplicationCoordinatorImpl::_getStableOpTime_inlock() {
}
void ReplicationCoordinatorImpl::_setStableTimestampForStorage_inlock() {
-
// Get the current stable optime.
auto stableOpTime = _getStableOpTime_inlock();
diff --git a/src/mongo/db/repl/replication_coordinator_impl_test.cpp b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
index 13683f2eee9..0da2179b76e 100644
--- a/src/mongo/db/repl/replication_coordinator_impl_test.cpp
+++ b/src/mongo/db/repl/replication_coordinator_impl_test.cpp
@@ -58,6 +58,7 @@
#include "mongo/db/repl/topology_coordinator.h"
#include "mongo/db/repl/update_position_args.h"
#include "mongo/db/server_options.h"
+#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/service_context.h"
#include "mongo/db/write_concern_options.h"
#include "mongo/executor/network_interface_mock.h"
@@ -3787,6 +3788,8 @@ TEST_F(StableOpTimeTest, CalculateStableOpTime) {
initReplSetMode();
auto repl = getReplCoord();
+ getStorageInterface()->supportsDocLockingBool = true;
+ auto txnMetrics = ServerTransactionsMetrics::get(getGlobalServiceContext());
OpTime commitPoint;
boost::optional<OpTime> expectedStableOpTime, stableOpTime;
std::set<OpTime> stableOpTimeCandidates;
@@ -3849,6 +3852,22 @@ TEST_F(StableOpTimeTest, CalculateStableOpTime) {
expectedStableOpTime = OpTime({0, 1}, term);
stableOpTime = repl->calculateStableOpTime_forTest(stableOpTimeCandidates, commitPoint);
ASSERT_EQ(expectedStableOpTime, stableOpTime);
+
+ // Set the oldest oplog entry OpTime for non-majority committed aborts/commits associated
+ // with a multi-document transaction to be before the current commit point. We will then
+ // make expectedStableOpTime the new stable optime because it is the only candidate before
+ // the oldestNonMajCommittedOpTime.
+ commitPoint = OpTime({1, 5}, term);
+ const auto oldestNonMajCommittedOpTime = OpTime({1, 3}, term);
+ const auto finishOpTime = OpTime({1, 3}, term);
+ stableOpTimeCandidates = {oldestNonMajCommittedOpTime, OpTime({1, 4}, term)};
+
+ // Adds the oldestNonMajCommittedOpTime to both sets.
+ txnMetrics->addActiveOpTime(oldestNonMajCommittedOpTime);
+ // Update the finishOpTime.
+ txnMetrics->removeActiveOpTime(oldestNonMajCommittedOpTime, finishOpTime);
+ stableOpTime = repl->calculateStableOpTime_forTest(stableOpTimeCandidates, commitPoint);
+ ASSERT_EQ(oldestNonMajCommittedOpTime, stableOpTime);
}
TEST_F(StableOpTimeTest, CleanupStableOpTimeCandidates) {
diff --git a/src/mongo/db/server_transactions_metrics.cpp b/src/mongo/db/server_transactions_metrics.cpp
index 43d1c9d4c9e..3386bc19c42 100644
--- a/src/mongo/db/server_transactions_metrics.cpp
+++ b/src/mongo/db/server_transactions_metrics.cpp
@@ -113,34 +113,114 @@ void ServerTransactionsMetrics::incrementTotalCommitted() {
_totalCommitted.fetchAndAdd(1);
}
-boost::optional<Timestamp> ServerTransactionsMetrics::getOldestActiveTS() const {
- if (_oldestActiveOplogEntryTS.empty()) {
+boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestActiveOpTime() const {
+ if (_oldestActiveOplogEntryOpTimes.empty()) {
return boost::none;
}
- return *(_oldestActiveOplogEntryTS.begin());
+ return *(_oldestActiveOplogEntryOpTimes.begin());
}
-void ServerTransactionsMetrics::addActiveTS(Timestamp oldestOplogEntryTS) {
- auto ret = _oldestActiveOplogEntryTS.insert(oldestOplogEntryTS);
- // If ret.second is false, the timestamp we tried to insert already existed.
- invariant(ret.second == true,
- str::stream() << "This oplog entry timestamp already exists."
- << "TS: "
- << oldestOplogEntryTS.toString());
+void ServerTransactionsMetrics::addActiveOpTime(repl::OpTime oldestOplogEntryOpTime) {
+ auto ret = _oldestActiveOplogEntryOpTimes.insert(oldestOplogEntryOpTime);
+ // If ret.second is false, the OpTime we tried to insert already existed.
+ invariant(ret.second,
+ str::stream() << "This oplog entry OpTime already exists in "
+ << "oldestActiveOplogEntryOpTimes."
+ << "oldestOplogEntryOpTime: "
+ << oldestOplogEntryOpTime.toString());
+
+ // Add this OpTime to the oldestNonMajorityCommittedOpTimes set with a finishOpTime of
+ // Timestamp::max() to signify that it has not been committed/aborted.
+ std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime,
+ repl::OpTime::max());
+ auto ret2 = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime);
+ // If ret2.second is false, the OpTime we tried to insert already existed.
+ invariant(ret2.second,
+ str::stream() << "This oplog entry OpTime already exists in "
+ << "oldestNonMajorityCommittedOpTimes."
+ << "oldestOplogEntryOpTime: "
+ << oldestOplogEntryOpTime.toString());
}
-void ServerTransactionsMetrics::removeActiveTS(Timestamp oldestOplogEntryTS) {
- auto it = _oldestActiveOplogEntryTS.find(oldestOplogEntryTS);
- invariant(it != _oldestActiveOplogEntryTS.end(),
- str::stream() << "This oplog entry timestamp does not exist "
- << "or has already been removed."
- << "TS: "
- << oldestOplogEntryTS.toString());
- _oldestActiveOplogEntryTS.erase(it);
+void ServerTransactionsMetrics::removeActiveOpTime(repl::OpTime oldestOplogEntryOpTime,
+ boost::optional<repl::OpTime> finishOpTime) {
+ auto it = _oldestActiveOplogEntryOpTimes.find(oldestOplogEntryOpTime);
+ invariant(it != _oldestActiveOplogEntryOpTimes.end(),
+ str::stream() << "This oplog entry OpTime does not exist in or has already been "
+ << "removed from oldestActiveOplogEntryOpTimes."
+ << "OpTime: "
+ << oldestOplogEntryOpTime.toString());
+ _oldestActiveOplogEntryOpTimes.erase(it);
+
+ if (!finishOpTime) {
+ return;
+ }
+
+ // The transaction's oldestOplogEntryOpTime now has a corresponding finishTime, which will
+ // be its commit or abort oplog entry OpTime. Add this pair to the
+ // oldestNonMajorityCommittedOpTimes.
+ // Currently, the oldestOplogEntryOpTime will be a prepareOpTime so we will only have a
+ // finishOpTime if we are committing a prepared transaction or aborting an active prepared
+ // transaction.
+ std::pair<repl::OpTime, repl::OpTime> opTimeToRemove(oldestOplogEntryOpTime,
+ repl::OpTime::max());
+ auto it2 = _oldestNonMajorityCommittedOpTimes.find(opTimeToRemove);
+ invariant(it2 != _oldestNonMajorityCommittedOpTimes.end(),
+ str::stream() << "This oplog entry OpTime does not exist in or has already been "
+ << "removed from oldestNonMajorityCommittedOpTimes"
+ << "oldestOplogEntryOpTime: "
+ << oldestOplogEntryOpTime.toString());
+ _oldestNonMajorityCommittedOpTimes.erase(it2);
+
+ std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime,
+ *finishOpTime);
+ auto ret = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime);
+ // If ret.second is false, the OpTime we tried to insert already existed.
+ invariant(ret.second,
+ str::stream() << "This oplog entry OpTime pair already exists in "
+ << "oldestNonMajorityCommittedOpTimes."
+ << "oldestOplogEntryOpTime: "
+ << oldestOplogEntryOpTime.toString()
+ << "finishOpTime: "
+ << finishOpTime->toString());
+}
+
+boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestNonMajorityCommittedOpTime()
+ const {
+ if (_oldestNonMajorityCommittedOpTimes.empty()) {
+ return boost::none;
+ }
+ const auto oldestNonMajorityCommittedOpTime = _oldestNonMajorityCommittedOpTimes.begin()->first;
+ invariant(!oldestNonMajorityCommittedOpTime.isNull());
+ return oldestNonMajorityCommittedOpTime;
+}
+
+void ServerTransactionsMetrics::removeOpTimesLessThanOrEqToCommittedOpTime(
+ repl::OpTime committedOpTime) {
+ // Iterate through oldestNonMajorityCommittedOpTimes and remove all pairs whose "finishOpTime"
+ // is now less than or equal to the commit point.
+ for (auto it = _oldestNonMajorityCommittedOpTimes.begin();
+ it != _oldestNonMajorityCommittedOpTimes.end();) {
+ if (it->second <= committedOpTime) {
+ it = _oldestNonMajorityCommittedOpTimes.erase(it);
+ } else {
+ ++it;
+ }
+ }
+}
+
+boost::optional<repl::OpTime>
+ServerTransactionsMetrics::getFinishOpTimeOfOldestNonMajCommitted_forTest() const {
+ if (_oldestNonMajorityCommittedOpTimes.empty()) {
+ return boost::none;
+ }
+ const auto oldestNonMajorityCommittedOpTime =
+ _oldestNonMajorityCommittedOpTimes.begin()->second;
+ return oldestNonMajorityCommittedOpTime;
}
-unsigned int ServerTransactionsMetrics::getTotalActiveTS() const {
- return _oldestActiveOplogEntryTS.size();
+unsigned int ServerTransactionsMetrics::getTotalActiveOpTimes() const {
+ return _oldestActiveOplogEntryOpTimes.size();
}
void ServerTransactionsMetrics::updateStats(TransactionsStats* stats) {
diff --git a/src/mongo/db/server_transactions_metrics.h b/src/mongo/db/server_transactions_metrics.h
index f82dfb179e6..7dbc208e0c6 100644
--- a/src/mongo/db/server_transactions_metrics.h
+++ b/src/mongo/db/server_transactions_metrics.h
@@ -32,8 +32,8 @@
#include <set>
-#include "mongo/bson/timestamp.h"
#include "mongo/db/operation_context.h"
+#include "mongo/db/repl/optime.h"
#include "mongo/db/service_context.h"
#include "mongo/db/transactions_stats_gen.h"
@@ -73,26 +73,56 @@ public:
void incrementTotalCommitted();
/**
- * Returns the Timestamp of the oldest oplog entry written across all open transactions.
- * Returns boost::none if there are no transaction oplog entry Timestamps stored.
+ * Returns the OpTime of the oldest oplog entry written across all open transactions.
+ * Returns boost::none if there are no transaction oplog entry OpTimes stored.
*/
- boost::optional<Timestamp> getOldestActiveTS() const;
+ boost::optional<repl::OpTime> getOldestActiveOpTime() const;
/**
- * Add the transaction's oplog entry Timestamp to a set of Timestamps.
+ * Add the transaction's oplog entry OpTime to oldestActiveOplogEntryOpTimes, a set of OpTimes.
+ * Also creates a pair with this OpTime and OpTime::max() as the corresponding commit/abort
+ * oplog entry OpTime. Finally, adds this to oldestNonMajorityCommittedOpTimes.
*/
- void addActiveTS(Timestamp oldestOplogEntryTS);
+ void addActiveOpTime(repl::OpTime oldestOplogEntryOpTime);
/**
- * Remove the corresponding transaction oplog entry Timestamp if the transaction commits or
- * aborts.
+ * Remove the corresponding transaction oplog entry OpTime if the transaction commits or
+ * aborts. Also updates the pair in oldestNonMajorityCommittedOpTimes with the
+ * oldestOplogEntryOpTime to have a valid finishOpTime instead of OpTime::max(). It's stored in
+ * the format: < oldestOplogEntryOpTime, finishOpTime >.
*/
- void removeActiveTS(Timestamp oldestOplogEntryTS);
+ void removeActiveOpTime(repl::OpTime oldestOplogEntryOpTime,
+ boost::optional<repl::OpTime> finishOpTime);
/**
- * Returns the number of transaction oplog entry Timestamps currently stored.
+ * Returns the number of transaction oplog entry OpTimes currently stored.
*/
- unsigned int getTotalActiveTS() const;
+ unsigned int getTotalActiveOpTimes() const;
+
+ /**
+ * Returns the oldest oplog entry OpTime across transactions whose corresponding commit or
+ * abort oplog entry has not been majority committed.
+ */
+ boost::optional<repl::OpTime> getOldestNonMajorityCommittedOpTime() const;
+
+ /**
+ * Remove the corresponding transaction oplog entry OpTime pair from
+ * oldestNonMajorityCommittedOpTimes if the transaction is majority committed or aborted.
+ * We determine this by checking if there are any pairs in the set whose
+ * 'finishOpTime' <= 'committedOpTime'.
+ */
+ void removeOpTimesLessThanOrEqToCommittedOpTime(repl::OpTime committedOpTime);
+
+ /**
+ * Testing function that adds an OpTime pair to oldestNonMajorityCommittedOpTimes.
+ */
+ void addNonMajCommittedOpTimePair_forTest(std::pair<repl::OpTime, repl::OpTime> OpTimePair);
+
+ /**
+ * Testing function that returns the oldest non-majority committed OpTime pair in the form:
+ * < oldestOplogEntryOpTime, finishOpTime >.
+ */
+ boost::optional<repl::OpTime> getFinishOpTimeOfOldestNonMajCommitted_forTest() const;
/**
* Appends the accumulated stats to a transactions stats object.
@@ -118,10 +148,21 @@ private:
// The total number of multi-document transaction commits.
AtomicUInt64 _totalCommitted{0};
- // Maintain the oldest oplog entry Timestamp across all active transactions. Currently, we only
+ // Maintain the oldest oplog entry OpTime across all active transactions. Currently, we only
// write an oplog entry for an ongoing transaction if it is in the `prepare` state. By
- // maintaining an ordered set of timestamps, the timestamp at the beginning will be the oldest.
- std::set<Timestamp> _oldestActiveOplogEntryTS;
+ // maintaining an ordered set of OpTimes, the OpTime at the beginning will be the oldest.
+ std::set<repl::OpTime> _oldestActiveOplogEntryOpTimes;
+
+ // Maintain the oldest oplog entry OpTime across transactions whose corresponding abort/commit
+ // oplog entries have not been majority committed. Since this is an ordered set, the first
+ // pair's oldestOplogEntryOpTime represents the earliest OpTime that we should pin the stable
+ // timestamp behind.
+ // Each pair is structured as follows: <oldestOplogEntryOpTime, finishOpTime>
+ // 'oldestOplogEntryOpTime': The first oplog entry OpTime written by a transaction.
+ // 'finishOpTime': The commit/abort oplog entry OpTime.
+ // Once the corresponding abort/commit entry has been majority committed, remove the pair from
+ // this set.
+ std::set<std::pair<repl::OpTime, repl::OpTime>> _oldestNonMajorityCommittedOpTimes;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_metrics_observer.cpp b/src/mongo/db/transaction_metrics_observer.cpp
index 10109763fbc..eea17ad5379 100644
--- a/src/mongo/db/transaction_metrics_observer.cpp
+++ b/src/mongo/db/transaction_metrics_observer.cpp
@@ -98,8 +98,11 @@ void TransactionMetricsObserver::onUnstash(ServerTransactionsMetrics* serverTran
void TransactionMetricsObserver::onCommit(ServerTransactionsMetrics* serverTransactionsMetrics,
TickSource* tickSource,
- boost::optional<Timestamp> oldestOplogEntryTS,
+ boost::optional<repl::OpTime> oldestOplogEntryOpTime,
+ boost::optional<repl::OpTime> commitOpTime,
Top* top) {
+ invariant((oldestOplogEntryOpTime != boost::none && commitOpTime != boost::none) ||
+ (oldestOplogEntryOpTime == boost::none && commitOpTime == boost::none));
//
// Per transaction metrics.
//
@@ -124,16 +127,20 @@ void TransactionMetricsObserver::onCommit(ServerTransactionsMetrics* serverTrans
durationCount<Microseconds>(_singleTransactionStats.getDuration(tickSource, curTick));
top->incrementGlobalTransactionLatencyStats(static_cast<uint64_t>(duration));
- // Remove this transaction's oldest oplog entry Timestamp if one was written.
- if (oldestOplogEntryTS) {
- serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS);
+ // Remove this transaction's oldest oplog entry OpTime if one was written.
+ if (oldestOplogEntryOpTime) {
+ serverTransactionsMetrics->removeActiveOpTime(*oldestOplogEntryOpTime, commitOpTime);
}
}
void TransactionMetricsObserver::onAbortActive(ServerTransactionsMetrics* serverTransactionsMetrics,
TickSource* tickSource,
- boost::optional<Timestamp> oldestOplogEntryTS,
+ boost::optional<repl::OpTime> oldestOplogEntryOpTime,
+ boost::optional<repl::OpTime> abortOpTime,
Top* top) {
+ invariant((oldestOplogEntryOpTime != boost::none && abortOpTime != boost::none) ||
+ (oldestOplogEntryOpTime == boost::none && abortOpTime == boost::none));
+
auto curTick = tickSource->getTicks();
_onAbort(serverTransactionsMetrics, curTick, tickSource, top);
//
@@ -150,16 +157,16 @@ void TransactionMetricsObserver::onAbortActive(ServerTransactionsMetrics* server
//
serverTransactionsMetrics->decrementCurrentActive();
- // Remove this transaction's oldest oplog entry Timestamp if one was written.
- if (oldestOplogEntryTS) {
- serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS);
+ // Remove this transaction's oldest oplog entry OpTime if one was written.
+ if (oldestOplogEntryOpTime) {
+ serverTransactionsMetrics->removeActiveOpTime(*oldestOplogEntryOpTime, abortOpTime);
}
}
void TransactionMetricsObserver::onAbortInactive(
ServerTransactionsMetrics* serverTransactionsMetrics,
TickSource* tickSource,
- boost::optional<Timestamp> oldestOplogEntryTS,
+ boost::optional<repl::OpTime> oldestOplogEntryOpTime,
Top* top) {
auto curTick = tickSource->getTicks();
_onAbort(serverTransactionsMetrics, curTick, tickSource, top);
@@ -169,9 +176,9 @@ void TransactionMetricsObserver::onAbortInactive(
//
serverTransactionsMetrics->decrementCurrentInactive();
- // Remove this transaction's oldest oplog entry Timestamp if one was written.
- if (oldestOplogEntryTS) {
- serverTransactionsMetrics->removeActiveTS(*oldestOplogEntryTS);
+ // Remove this transaction's oldest oplog entry OpTime if one was written.
+ if (oldestOplogEntryOpTime) {
+ serverTransactionsMetrics->removeActiveOpTime(*oldestOplogEntryOpTime, boost::none);
}
}
@@ -208,11 +215,11 @@ void TransactionMetricsObserver::_onAbort(ServerTransactionsMetrics* serverTrans
}
void TransactionMetricsObserver::onPrepare(ServerTransactionsMetrics* serverTransactionsMetrics,
- Timestamp prepareTimestamp) {
+ repl::OpTime prepareOpTime) {
// Since we currently only write an oplog entry for an in progress transaction when it is in
- // the prepare state, the prepareTimestamp is currently the oldest timestamp written to the
+ // the prepare state, the prepareOpTime is currently the oldest OpTime written to the
// oplog for this transaction.
- serverTransactionsMetrics->addActiveTS(prepareTimestamp);
+ serverTransactionsMetrics->addActiveOpTime(prepareOpTime);
}
} // namespace mongo
diff --git a/src/mongo/db/transaction_metrics_observer.h b/src/mongo/db/transaction_metrics_observer.h
index d1a3bba4244..715aa14244f 100644
--- a/src/mongo/db/transaction_metrics_observer.h
+++ b/src/mongo/db/transaction_metrics_observer.h
@@ -70,37 +70,46 @@ public:
/**
* Updates relevant metrics when a transaction commits. Also removes this transaction's oldest
- * oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none.
+ * oplog entry OpTime from the oldestActiveOplogEntryOpTimes set if it is not boost::none.
+ * Finally, updates an entry in oldestNonMajorityCommittedOpTimes to include its commit OpTime.
*/
void onCommit(ServerTransactionsMetrics* serverTransactionsMetrics,
TickSource* tickSource,
- boost::optional<Timestamp> oldestOplogEntryTS,
+ boost::optional<repl::OpTime> oldestOplogEntryOpTime,
+ boost::optional<repl::OpTime> commitOpTime,
Top* top);
/**
* Updates relevant metrics when an active transaction aborts. Also removes this transaction's
- * oldest oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none.
+ * oldest oplog entry OpTime from the oldestActiveOplogEntryOpTimes set if it is not
+ * boost::none.
+ * Finally, updates an entry in oldestNonMajorityCommittedOpTimes to include its abort OpTime.
*/
void onAbortActive(ServerTransactionsMetrics* serverTransactionsMetrics,
TickSource* tickSource,
- boost::optional<Timestamp> oldestOplogEntryTS,
+ boost::optional<repl::OpTime> oldestOplogEntryOpTime,
+ boost::optional<repl::OpTime> abortOpTime,
Top* top);
/**
* Updates relevant metrics when an inactive transaction aborts. Also removes this transaction's
- * oldest oplog entry Timestamp from the oldestActiveOplogEntryTS set if it is not boost::none.
+ * oldest oplog entry OpTime from the oldestActiveOplogEntryOpTimes set if it is not
+ * boost::none.
+ * Does not accept an optional abortOpTime parameter because we cannot abort an inactive
+ * prepared transaction. Instead, uses boost::none as the abortOpTime, which subsequently will
+ * not modify oldestNonMajorityCommittedOpTimes.
*/
void onAbortInactive(ServerTransactionsMetrics* serverTransactionsMetrics,
TickSource* tickSource,
- boost::optional<Timestamp> oldestOplogEntryTS,
+ boost::optional<repl::OpTime> oldestOplogEntryOpTime,
Top* top);
/**
- * Adds the prepareTimestamp, which is currently the Timestamp of the first oplog entry written
- * by an active transaction, to the oldestActiveOplogEntryTS set.
+ * Adds the prepareOpTime, which is currently the Timestamp of the first oplog entry written by
+ * an active transaction, to the oldestActiveOplogEntryTS set.
*/
void onPrepare(ServerTransactionsMetrics* serverTransactionsMetrics,
- Timestamp prepareTimestamp);
+ repl::OpTime prepareOpTime);
/**
* Updates relevant metrics when an operation running on the transaction completes. An operation
diff --git a/src/mongo/db/transaction_participant.cpp b/src/mongo/db/transaction_participant.cpp
index 0ca9a2367d9..95cbf2dd09b 100644
--- a/src/mongo/db/transaction_participant.cpp
+++ b/src/mongo/db/transaction_participant.cpp
@@ -823,20 +823,20 @@ Timestamp TransactionParticipant::prepareTransaction(OperationContext* opCtx,
abortGuard.Dismiss();
- invariant(!_oldestOplogEntryTS,
+ invariant(!_oldestOplogEntryOpTime,
str::stream() << "This transaction's oldest oplog entry Timestamp has already "
<< "been set to: "
- << _oldestOplogEntryTS->toString());
+ << _oldestOplogEntryOpTime->toString());
// Keep track of the Timestamp from the first oplog entry written by this transaction.
- _oldestOplogEntryTS = prepareOplogSlot.opTime.getTimestamp();
+ _oldestOplogEntryOpTime = prepareOplogSlot.opTime;
- // Maintain the Timestamp of the oldest active oplog entry for this transaction. We currently
+ // Maintain the OpTime of the oldest active oplog entry for this transaction. We currently
// only write an oplog entry for an in progress transaction when it is in the prepare state
// but this will change when we allow multiple oplog entries per transaction.
{
stdx::lock_guard<stdx::mutex> lm(_metricsMutex);
_transactionMetricsObserver.onPrepare(ServerTransactionsMetrics::get(opCtx),
- *_oldestOplogEntryTS);
+ *_oldestOplogEntryOpTime);
}
return prepareOplogSlot.opTime.getTimestamp();
@@ -895,10 +895,10 @@ void TransactionParticipant::commitUnpreparedTransaction(OperationContext* opCtx
!_txnState.isPrepared(lk));
// TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB.
- invariant(!_oldestOplogEntryTS,
+ invariant(!_oldestOplogEntryOpTime,
str::stream() << "The oldest oplog entry Timestamp should not have been set because "
<< "this transaction is not prepared. But, it is currently "
- << _oldestOplogEntryTS->toString());
+ << _oldestOplogEntryOpTime->toString());
// We need to unlock the session to run the opObserver onTransactionCommit, which calls back
// into the session.
@@ -967,6 +967,11 @@ void TransactionParticipant::commitPreparedTransaction(OperationContext* opCtx,
lk.lock();
_checkIsActiveTransaction(lk, *opCtx->getTxnNumber(), true);
+ // If we are committing a prepared transaction, then we must have already recorded this
+ // transaction's oldest oplog entry optime.
+ invariant(_oldestOplogEntryOpTime);
+ _finishOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+
_finishCommitTransaction(lk, opCtx);
} catch (...) {
// It is illegal for committing a prepared transaction to fail for any reason, other than an
@@ -1015,7 +1020,8 @@ void TransactionParticipant::_finishCommitTransaction(WithLock lk, OperationCont
auto tickSource = opCtx->getServiceContext()->getTickSource();
_transactionMetricsObserver.onCommit(ServerTransactionsMetrics::get(opCtx),
tickSource,
- _oldestOplogEntryTS,
+ _oldestOplogEntryOpTime,
+ _finishOpTime,
&Top::get(getGlobalServiceContext()));
_transactionMetricsObserver.onTransactionOperation(
opCtx->getClient(), CurOp::get(opCtx)->debug().additiveMetrics);
@@ -1097,10 +1103,10 @@ void TransactionParticipant::abortActiveUnpreparedOrStashPreparedTransaction(
}
// TODO SERVER-37129: Remove this invariant once we allow transactions larger than 16MB.
- invariant(!_oldestOplogEntryTS,
+ invariant(!_oldestOplogEntryOpTime,
str::stream() << "The oldest oplog entry Timestamp should not have been set because "
<< "this transaction is not prepared. But, it is currently "
- << _oldestOplogEntryTS->toString());
+ << _oldestOplogEntryOpTime->toString());
_abortActiveTransaction(std::move(lock), opCtx, TransactionState::kInProgress);
} catch (...) {
@@ -1149,6 +1155,12 @@ void TransactionParticipant::_abortActiveTransaction(stdx::unique_lock<stdx::mut
lock.lock();
// We do not check if the active transaction number is correct here because we handle it below.
+ // Set the finishOpTime of this transaction if we have recorded this transaction's oldest oplog
+ // entry optime.
+ if (_oldestOplogEntryOpTime) {
+ _finishOpTime = repl::ReplClientInfo::forClient(opCtx->getClient()).getLastOp();
+ }
+
// Only abort the transaction in session if it's in expected states.
// When the state of active transaction on session is not expected, it means another
// thread has already aborted the transaction on session.
@@ -1185,7 +1197,7 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_transactionMetricsObserver.onAbortInactive(
ServerTransactionsMetrics::get(getGlobalServiceContext()),
tickSource,
- _oldestOplogEntryTS,
+ _oldestOplogEntryOpTime,
&Top::get(getGlobalServiceContext()));
}
_logSlowTransaction(wl,
@@ -1198,7 +1210,8 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_transactionMetricsObserver.onAbortActive(
ServerTransactionsMetrics::get(getGlobalServiceContext()),
tickSource,
- _oldestOplogEntryTS,
+ _oldestOplogEntryOpTime,
+ _finishOpTime,
&Top::get(getGlobalServiceContext()));
}
@@ -1206,7 +1219,8 @@ void TransactionParticipant::_abortTransactionOnSession(WithLock wl) {
_transactionOperations.clear();
_txnState.transitionTo(wl, TransactionState::kAborted);
_prepareOpTime = repl::OpTime();
- _oldestOplogEntryTS = boost::none;
+ _oldestOplogEntryOpTime = boost::none;
+ _finishOpTime = boost::none;
_speculativeTransactionReadOpTime = repl::OpTime();
}
@@ -1537,7 +1551,8 @@ void TransactionParticipant::_setNewTxnNumber(WithLock wl, const TxnNumber& txnN
// Reset the transactional state
_txnState.transitionTo(wl, TransactionState::kNone);
_prepareOpTime = repl::OpTime();
- _oldestOplogEntryTS = boost::none;
+ _oldestOplogEntryOpTime = boost::none;
+ _finishOpTime = boost::none;
_speculativeTransactionReadOpTime = repl::OpTime();
_multikeyPathInfo.clear();
_autoCommit = boost::none;
diff --git a/src/mongo/db/transaction_participant.h b/src/mongo/db/transaction_participant.h
index 451f502b5bb..a9830833c17 100644
--- a/src/mongo/db/transaction_participant.h
+++ b/src/mongo/db/transaction_participant.h
@@ -800,8 +800,11 @@ private:
// Tracks and updates transaction metrics upon the appropriate transaction event.
TransactionMetricsObserver _transactionMetricsObserver;
- // Tracks the Timestamp of the first oplog entry written by this TransactionParticipant.
- boost::optional<Timestamp> _oldestOplogEntryTS;
+ // Tracks the OpTime of the first oplog entry written by this TransactionParticipant.
+ boost::optional<repl::OpTime> _oldestOplogEntryOpTime;
+
+ // Tracks the OpTime of the abort/commit oplog entry associated with this transaction.
+ boost::optional<repl::OpTime> _finishOpTime;
};
} // namespace mongo
diff --git a/src/mongo/db/transaction_participant_test.cpp b/src/mongo/db/transaction_participant_test.cpp
index 11c35d5f597..70c5fe92ce2 100644
--- a/src/mongo/db/transaction_participant_test.cpp
+++ b/src/mongo/db/transaction_participant_test.cpp
@@ -41,6 +41,7 @@
#include "mongo/db/repl/oplog.h"
#include "mongo/db/repl/oplog_entry.h"
#include "mongo/db/repl/optime.h"
+#include "mongo/db/repl/repl_client_info.h"
#include "mongo/db/server_transactions_metrics.h"
#include "mongo/db/service_context.h"
#include "mongo/db/session_catalog_mongod.h"
@@ -882,7 +883,8 @@ TEST_F(TxnParticipantTest, KillSessionsDuringPrepareDoesNotAbortTransaction) {
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
// Check that the oldest prepareTimestamp is the one we just set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
+ auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
+ ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp);
ASSERT(_opObserver->transactionPrepared);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
}
@@ -1078,7 +1080,8 @@ TEST_F(TxnParticipantTest, KillSessionsDoesNotAbortPreparedTransactions) {
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
// Check that the oldest prepareTimestamp is the one we just set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
+ auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
+ ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
txnParticipant->abortArbitraryTransaction();
@@ -1104,7 +1107,8 @@ TEST_F(TxnParticipantTest, TransactionTimeoutDoesNotAbortPreparedTransactions) {
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
// Check that the oldest prepareTimestamp is the one we just set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
+ auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
+ ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
txnParticipant->abortArbitraryTransactionIfExpired();
@@ -1132,7 +1136,8 @@ TEST_F(TxnParticipantTest, CannotStartNewTransactionWhilePreparedTransactionInPr
ASSERT_EQ(ruPrepareTimestamp, prepareTimestamp);
// Check that the oldest prepareTimestamp is the one we just set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
+ auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
+ ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp);
txnParticipant->stashTransactionResources(opCtx());
@@ -1223,7 +1228,8 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti
auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
// Check that the oldest prepareTimestamp is the one we just set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), prepareTimestamp);
+ auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
+ ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp);
auto sessionId = *opCtx()->getLogicalSessionId();
auto txnNum = *opCtx()->getTxnNumber();
@@ -1242,7 +1248,7 @@ DEATH_TEST_F(TxnParticipantTest, AbortIsIllegalDuringCommittingPreparedTransacti
txnParticipant->commitPreparedTransaction(opCtx(), prepareTimestamp);
// Check that we removed the prepareTimestamp from the set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(), boost::none);
}
TEST_F(TxnParticipantTest, CannotContinueNonExistentTransaction) {
@@ -2975,23 +2981,20 @@ TEST_F(TransactionsMetricsTest, LogTransactionInfoAfterSlowStashedAbort) {
}
TEST_F(TxnParticipantTest, WhenOldestTSRemovedNextOldestBecomesNewOldest) {
- auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
OperationContextSessionMongod opCtxSession(opCtx(), true, makeSessionInfo());
auto txnParticipant = TransactionParticipant::get(opCtx());
// Check that there are no Timestamps in the set.
- unsigned int zero = 0;
- ASSERT_EQ(totalActiveTxnTS, zero);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U);
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
auto firstPrepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
// Check that we added a Timestamp to the set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1);
- // totalActiveTxnTS = 1
- totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U);
// Check that the oldest prepareTimestamp is equal to firstPrepareTimestamp because there is
// only one prepared transaction on this Service.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), firstPrepareTimestamp);
+ auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
+ ASSERT_EQ(prepareOpTime->getTimestamp(), firstPrepareTimestamp);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
txnParticipant->stashTransactionResources(opCtx());
@@ -3023,13 +3026,10 @@ TEST_F(TxnParticipantTest, WhenOldestTSRemovedNextOldestBecomesNewOldest) {
secondPrepareTimestamp = newTxnParticipant->prepareTransaction(newOpCtx.get(), {});
ASSERT_GT(secondPrepareTimestamp, firstPrepareTimestamp);
// Check that we added a Timestamp to the set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(),
- totalActiveTxnTS + 1);
- // totalActiveTxnTS = 2
- totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 2U);
// The oldest prepareTimestamp should still be firstPrepareTimestamp.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(),
- firstPrepareTimestamp);
+ prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
+ ASSERT_EQ(prepareOpTime->getTimestamp(), firstPrepareTimestamp);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
}
@@ -3041,25 +3041,22 @@ TEST_F(TxnParticipantTest, WhenOldestTSRemovedNextOldestBecomesNewOldest) {
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(txnParticipant->transactionIsAborted());
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1);
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), secondPrepareTimestamp);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U);
+ prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
+ ASSERT_EQ(prepareOpTime->getTimestamp(), secondPrepareTimestamp);
}
TEST_F(TxnParticipantTest, ReturnNullTimestampIfNoOldestActiveTimestamp) {
- auto totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
OperationContextSessionMongod opCtxSession(opCtx(), true, makeSessionInfo());
auto txnParticipant = TransactionParticipant::get(opCtx());
// Check that there are no Timestamps in the set.
- unsigned int zero = 0;
- ASSERT_EQ(totalActiveTxnTS, zero);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U);
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
txnParticipant->prepareTransaction(opCtx(), {});
// Check that we added a Timestamp to the set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS + 1);
- // totalActiveTxnTS = 1
- totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U);
ASSERT_FALSE(txnParticipant->transactionIsAborted());
txnParticipant->stashTransactionResources(opCtx());
@@ -3089,32 +3086,146 @@ TEST_F(TxnParticipantTest, ReturnNullTimestampIfNoOldestActiveTimestamp) {
// transaction was prepared after.
newTxnParticipant->prepareTransaction(newOpCtx.get(), {});
// Check that we added a Timestamp to the set.
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(),
- totalActiveTxnTS + 1);
- // totalActiveTxnTS = 2
- totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 2U);
// The oldest prepareTimestamp should still be firstPrepareTimestamp.
ASSERT_FALSE(txnParticipant->transactionIsAborted());
// Abort this transaction and check that we have decremented the total active timestamps
// count.
newTxnParticipant->abortActiveTransaction(newOpCtx.get());
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(),
- totalActiveTxnTS - 1);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U);
}
Client::releaseCurrent();
Client::setCurrent(std::move(originalClient));
- // totalActiveTxnTS = 1
- totalActiveTxnTS = ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS();
// Switch clients and abort the first transaction. This means we no longer have an oldest active
// timestamp.
txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
txnParticipant->abortActiveTransaction(opCtx());
ASSERT(txnParticipant->transactionIsAborted());
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveTS(), totalActiveTxnTS - 1);
- ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveTS(), boost::none);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(), boost::none);
+}
+
+TEST_F(TxnParticipantTest, ProperlyMaintainOldestNonMajorityCommittedOpTimeSet) {
+ OperationContextSessionMongod opCtxSession(opCtx(), true, makeSessionInfo());
+ auto txnParticipant = TransactionParticipant::get(opCtx());
+
+ // Check that there are no Timestamps in the set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U);
+
+ txnParticipant->unstashTransactionResources(opCtx(), "prepareTransaction");
+ auto prepareTimestamp = txnParticipant->prepareTransaction(opCtx(), {});
+ // Check that we added a Timestamp to the set.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 1U);
+
+ // Check that the oldest prepareTimestamp is equal to first prepareTimestamp because there is
+ // only one prepared transaction on this Service.
+ auto prepareOpTime = ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime();
+ ASSERT_EQ(prepareOpTime->getTimestamp(), prepareTimestamp);
+
+ // Check that oldestNonMajorityCommittedOpTimes also has this prepareTimestamp and that the
+ // pair's finishOpTime is Timestamp::max() because this transaction has not been
+ // committed/aborted.
+ auto nonMajorityCommittedOpTime =
+ ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime();
+ ASSERT_EQ(nonMajorityCommittedOpTime->getTimestamp(), prepareTimestamp);
+ auto nonMajorityCommittedOpTimeFinishOpTime =
+ ServerTransactionsMetrics::get(opCtx())->getFinishOpTimeOfOldestNonMajCommitted_forTest();
+ ASSERT_EQ(nonMajorityCommittedOpTimeFinishOpTime->getTimestamp(), Timestamp::max());
+
+ ASSERT_FALSE(txnParticipant->transactionIsAborted());
+ // Since this test uses a mock opObserver, we have to manually set the finishTimestamp on the
+ // txnParticipant.
+ auto finishOpTime = repl::OpTime({10, 10}, 0);
+ repl::ReplClientInfo::forClient(opCtx()->getClient()).setLastOp(finishOpTime);
+
+ txnParticipant->abortActiveTransaction(opCtx());
+ ASSERT(txnParticipant->transactionIsAborted());
+
+ // Make sure that we moved the OpTime from the oldestActiveOplogEntryOpTimes to
+ // oldestNonMajorityCommittedOpTimes along with the abort/commit oplog entry OpTime
+ // associated with the transaction.
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getTotalActiveOpTimes(), 0U);
+ ASSERT_EQ(ServerTransactionsMetrics::get(opCtx())->getOldestActiveOpTime(), boost::none);
+
+ nonMajorityCommittedOpTime =
+ ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime();
+ nonMajorityCommittedOpTimeFinishOpTime =
+ ServerTransactionsMetrics::get(opCtx())->getFinishOpTimeOfOldestNonMajCommitted_forTest();
+ ASSERT_FALSE(nonMajorityCommittedOpTime == boost::none);
+ ASSERT_FALSE(nonMajorityCommittedOpTimeFinishOpTime == boost::none);
+ ASSERT_EQ(nonMajorityCommittedOpTime->getTimestamp(), prepareTimestamp);
+ ASSERT_EQ(nonMajorityCommittedOpTimeFinishOpTime, finishOpTime);
+
+ // If we pass in a mock commit point that is greater than the finish timestamp of the
+ // oldestNonMajorityCommittedOpTime, it should be removed from the set. This would mean that
+ // the abort/commit oplog entry is majority committed.
+ ServerTransactionsMetrics::get(opCtx())->removeOpTimesLessThanOrEqToCommittedOpTime(
+ repl::OpTime::max());
+ nonMajorityCommittedOpTime =
+ ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime();
+ ASSERT_EQ(nonMajorityCommittedOpTime, boost::none);
+}
+
+TEST_F(TxnParticipantTest, GetOldestNonMajorityCommittedOpTimeReturnsOldestEntry) {
+ const auto earlierOpTime = repl::OpTime({1, 1}, 0);
+ const auto earlierFinishOpTime = repl::OpTime({3, 2}, 0);
+
+ const auto middleOpTime = repl::OpTime({1, 2}, 0);
+ const auto middleFinishOpTime = repl::OpTime({3, 3}, 0);
+
+ const auto laterOpTime = repl::OpTime({1, 3}, 0);
+ const auto laterFinishOpTime = repl::OpTime({3, 4}, 0);
+
+ ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(earlierOpTime);
+ ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(earlierOpTime, earlierFinishOpTime);
+
+ ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(middleOpTime);
+ ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(middleOpTime, middleFinishOpTime);
+
+ ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(laterOpTime);
+ ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(laterOpTime, laterFinishOpTime);
+
+ auto nonMajorityCommittedOpTime =
+ ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime();
+
+ ASSERT_EQ(*nonMajorityCommittedOpTime, repl::OpTime({1, 1}, 0));
+
+ // If we pass in a mock commit point that is greater than the finish timestamp of the
+ // oldestNonMajorityCommittedOpTime, it should be removed from the set. This would mean that
+ // the abort/commit oplog entry is majority committed.
+ ServerTransactionsMetrics::get(opCtx())->removeOpTimesLessThanOrEqToCommittedOpTime(
+ repl::OpTime::max());
+ nonMajorityCommittedOpTime =
+ ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime();
+ ASSERT_EQ(nonMajorityCommittedOpTime, boost::none);
+
+ // Test that we can remove only a part of the set by passing in a commit point that is only
+ // greater than or equal to two of the optimes.
+ ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(earlierOpTime);
+ ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(earlierOpTime, earlierFinishOpTime);
+
+ ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(middleOpTime);
+ ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(middleOpTime, middleFinishOpTime);
+
+ ServerTransactionsMetrics::get(opCtx())->addActiveOpTime(laterOpTime);
+ ServerTransactionsMetrics::get(opCtx())->removeActiveOpTime(laterOpTime, laterFinishOpTime);
+
+ nonMajorityCommittedOpTime =
+ ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime();
+
+ ASSERT_EQ(*nonMajorityCommittedOpTime, earlierOpTime);
+
+ ServerTransactionsMetrics::get(opCtx())->removeOpTimesLessThanOrEqToCommittedOpTime(
+ repl::OpTime({3, 3}, 0));
+ nonMajorityCommittedOpTime =
+ ServerTransactionsMetrics::get(opCtx())->getOldestNonMajorityCommittedOpTime();
+
+ // earlierOpTime and middleOpTime must have been removed because their finishOpTime are less
+ // than or equal to the mock commit point.
+ ASSERT_EQ(nonMajorityCommittedOpTime, laterOpTime);
}