summaryrefslogtreecommitdiff
path: root/src/mongo/db/server_transactions_metrics.cpp
diff options
context:
space:
mode:
authorPavi Vetriselvan <pvselvan@umich.edu>2018-10-30 14:45:38 -0400
committerPavi Vetriselvan <pvselvan@umich.edu>2018-10-30 14:52:19 -0400
commit33ac1afd4079e04d12554f9b79d1ab07426caf59 (patch)
treee18f05ce76cec36901b76ed4cf8f6ba2ba97fb99 /src/mongo/db/server_transactions_metrics.cpp
parent6d475fdb5a76acab760ce4b6709b60a4c8c9aec6 (diff)
downloadmongo-33ac1afd4079e04d12554f9b79d1ab07426caf59.tar.gz
SERVER-35811 pin stable timestamp behind oldest uncommitted timestamp
Diffstat (limited to 'src/mongo/db/server_transactions_metrics.cpp')
-rw-r--r--src/mongo/db/server_transactions_metrics.cpp120
1 files changed, 100 insertions, 20 deletions
diff --git a/src/mongo/db/server_transactions_metrics.cpp b/src/mongo/db/server_transactions_metrics.cpp
index 43d1c9d4c9e..3386bc19c42 100644
--- a/src/mongo/db/server_transactions_metrics.cpp
+++ b/src/mongo/db/server_transactions_metrics.cpp
@@ -113,34 +113,114 @@ void ServerTransactionsMetrics::incrementTotalCommitted() {
_totalCommitted.fetchAndAdd(1);
}
-boost::optional<Timestamp> ServerTransactionsMetrics::getOldestActiveTS() const {
- if (_oldestActiveOplogEntryTS.empty()) {
+boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestActiveOpTime() const {
+ if (_oldestActiveOplogEntryOpTimes.empty()) {
return boost::none;
}
- return *(_oldestActiveOplogEntryTS.begin());
+ return *(_oldestActiveOplogEntryOpTimes.begin());
}
-void ServerTransactionsMetrics::addActiveTS(Timestamp oldestOplogEntryTS) {
- auto ret = _oldestActiveOplogEntryTS.insert(oldestOplogEntryTS);
- // If ret.second is false, the timestamp we tried to insert already existed.
- invariant(ret.second == true,
- str::stream() << "This oplog entry timestamp already exists."
- << "TS: "
- << oldestOplogEntryTS.toString());
+void ServerTransactionsMetrics::addActiveOpTime(repl::OpTime oldestOplogEntryOpTime) {
+ auto ret = _oldestActiveOplogEntryOpTimes.insert(oldestOplogEntryOpTime);
+ // If ret.second is false, the OpTime we tried to insert already existed.
+ invariant(ret.second,
+ str::stream() << "This oplog entry OpTime already exists in "
+ << "oldestActiveOplogEntryOpTimes."
+ << "oldestOplogEntryOpTime: "
+ << oldestOplogEntryOpTime.toString());
+
+ // Add this OpTime to the oldestNonMajorityCommittedOpTimes set with a finishOpTime of
+ // Timestamp::max() to signify that it has not been committed/aborted.
+ std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime,
+ repl::OpTime::max());
+ auto ret2 = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime);
+ // If ret2.second is false, the OpTime we tried to insert already existed.
+ invariant(ret2.second,
+ str::stream() << "This oplog entry OpTime already exists in "
+ << "oldestNonMajorityCommittedOpTimes."
+ << "oldestOplogEntryOpTime: "
+ << oldestOplogEntryOpTime.toString());
}
-void ServerTransactionsMetrics::removeActiveTS(Timestamp oldestOplogEntryTS) {
- auto it = _oldestActiveOplogEntryTS.find(oldestOplogEntryTS);
- invariant(it != _oldestActiveOplogEntryTS.end(),
- str::stream() << "This oplog entry timestamp does not exist "
- << "or has already been removed."
- << "TS: "
- << oldestOplogEntryTS.toString());
- _oldestActiveOplogEntryTS.erase(it);
+void ServerTransactionsMetrics::removeActiveOpTime(repl::OpTime oldestOplogEntryOpTime,
+ boost::optional<repl::OpTime> finishOpTime) {
+ auto it = _oldestActiveOplogEntryOpTimes.find(oldestOplogEntryOpTime);
+ invariant(it != _oldestActiveOplogEntryOpTimes.end(),
+ str::stream() << "This oplog entry OpTime does not exist in or has already been "
+ << "removed from oldestActiveOplogEntryOpTimes."
+ << "OpTime: "
+ << oldestOplogEntryOpTime.toString());
+ _oldestActiveOplogEntryOpTimes.erase(it);
+
+ if (!finishOpTime) {
+ return;
+ }
+
+ // The transaction's oldestOplogEntryOpTime now has a corresponding finishTime, which will
+ // be its commit or abort oplog entry OpTime. Add this pair to the
+ // oldestNonMajorityCommittedOpTimes.
+ // Currently, the oldestOplogEntryOpTime will be a prepareOpTime so we will only have a
+ // finishOpTime if we are committing a prepared transaction or aborting an active prepared
+ // transaction.
+ std::pair<repl::OpTime, repl::OpTime> opTimeToRemove(oldestOplogEntryOpTime,
+ repl::OpTime::max());
+ auto it2 = _oldestNonMajorityCommittedOpTimes.find(opTimeToRemove);
+ invariant(it2 != _oldestNonMajorityCommittedOpTimes.end(),
+ str::stream() << "This oplog entry OpTime does not exist in or has already been "
+ << "removed from oldestNonMajorityCommittedOpTimes"
+ << "oldestOplogEntryOpTime: "
+ << oldestOplogEntryOpTime.toString());
+ _oldestNonMajorityCommittedOpTimes.erase(it2);
+
+ std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime,
+ *finishOpTime);
+ auto ret = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime);
+ // If ret.second is false, the OpTime we tried to insert already existed.
+ invariant(ret.second,
+ str::stream() << "This oplog entry OpTime pair already exists in "
+ << "oldestNonMajorityCommittedOpTimes."
+ << "oldestOplogEntryOpTime: "
+ << oldestOplogEntryOpTime.toString()
+ << "finishOpTime: "
+ << finishOpTime->toString());
+}
+
+boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestNonMajorityCommittedOpTime()
+ const {
+ if (_oldestNonMajorityCommittedOpTimes.empty()) {
+ return boost::none;
+ }
+ const auto oldestNonMajorityCommittedOpTime = _oldestNonMajorityCommittedOpTimes.begin()->first;
+ invariant(!oldestNonMajorityCommittedOpTime.isNull());
+ return oldestNonMajorityCommittedOpTime;
+}
+
+void ServerTransactionsMetrics::removeOpTimesLessThanOrEqToCommittedOpTime(
+ repl::OpTime committedOpTime) {
+ // Iterate through oldestNonMajorityCommittedOpTimes and remove all pairs whose "finishOpTime"
+ // is now less than or equal to the commit point.
+ for (auto it = _oldestNonMajorityCommittedOpTimes.begin();
+ it != _oldestNonMajorityCommittedOpTimes.end();) {
+ if (it->second <= committedOpTime) {
+ it = _oldestNonMajorityCommittedOpTimes.erase(it);
+ } else {
+ ++it;
+ }
+ }
+}
+
+boost::optional<repl::OpTime>
+ServerTransactionsMetrics::getFinishOpTimeOfOldestNonMajCommitted_forTest() const {
+ if (_oldestNonMajorityCommittedOpTimes.empty()) {
+ return boost::none;
+ }
+ const auto oldestNonMajorityCommittedOpTime =
+ _oldestNonMajorityCommittedOpTimes.begin()->second;
+ return oldestNonMajorityCommittedOpTime;
}
-unsigned int ServerTransactionsMetrics::getTotalActiveTS() const {
- return _oldestActiveOplogEntryTS.size();
+unsigned int ServerTransactionsMetrics::getTotalActiveOpTimes() const {
+ return _oldestActiveOplogEntryOpTimes.size();
}
void ServerTransactionsMetrics::updateStats(TransactionsStats* stats) {