diff options
author | Pavi Vetriselvan <pvselvan@umich.edu> | 2018-10-30 14:45:38 -0400 |
---|---|---|
committer | Pavi Vetriselvan <pvselvan@umich.edu> | 2018-10-30 14:52:19 -0400 |
commit | 33ac1afd4079e04d12554f9b79d1ab07426caf59 (patch) | |
tree | e18f05ce76cec36901b76ed4cf8f6ba2ba97fb99 /src/mongo/db/server_transactions_metrics.cpp | |
parent | 6d475fdb5a76acab760ce4b6709b60a4c8c9aec6 (diff) | |
download | mongo-33ac1afd4079e04d12554f9b79d1ab07426caf59.tar.gz |
SERVER-35811 pin stable timestamp behind oldest uncommitted timestamp
Diffstat (limited to 'src/mongo/db/server_transactions_metrics.cpp')
-rw-r--r-- | src/mongo/db/server_transactions_metrics.cpp | 120 |
1 files changed, 100 insertions, 20 deletions
diff --git a/src/mongo/db/server_transactions_metrics.cpp b/src/mongo/db/server_transactions_metrics.cpp index 43d1c9d4c9e..3386bc19c42 100644 --- a/src/mongo/db/server_transactions_metrics.cpp +++ b/src/mongo/db/server_transactions_metrics.cpp @@ -113,34 +113,114 @@ void ServerTransactionsMetrics::incrementTotalCommitted() { _totalCommitted.fetchAndAdd(1); } -boost::optional<Timestamp> ServerTransactionsMetrics::getOldestActiveTS() const { - if (_oldestActiveOplogEntryTS.empty()) { +boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestActiveOpTime() const { + if (_oldestActiveOplogEntryOpTimes.empty()) { return boost::none; } - return *(_oldestActiveOplogEntryTS.begin()); + return *(_oldestActiveOplogEntryOpTimes.begin()); } -void ServerTransactionsMetrics::addActiveTS(Timestamp oldestOplogEntryTS) { - auto ret = _oldestActiveOplogEntryTS.insert(oldestOplogEntryTS); - // If ret.second is false, the timestamp we tried to insert already existed. - invariant(ret.second == true, - str::stream() << "This oplog entry timestamp already exists." - << "TS: " - << oldestOplogEntryTS.toString()); +void ServerTransactionsMetrics::addActiveOpTime(repl::OpTime oldestOplogEntryOpTime) { + auto ret = _oldestActiveOplogEntryOpTimes.insert(oldestOplogEntryOpTime); + // If ret.second is false, the OpTime we tried to insert already existed. + invariant(ret.second, + str::stream() << "This oplog entry OpTime already exists in " + << "oldestActiveOplogEntryOpTimes." + << "oldestOplogEntryOpTime: " + << oldestOplogEntryOpTime.toString()); + + // Add this OpTime to the oldestNonMajorityCommittedOpTimes set with a finishOpTime of + // Timestamp::max() to signify that it has not been committed/aborted. + std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime, + repl::OpTime::max()); + auto ret2 = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime); + // If ret2.second is false, the OpTime we tried to insert already existed. + invariant(ret2.second, + str::stream() << "This oplog entry OpTime already exists in " + << "oldestNonMajorityCommittedOpTimes." + << "oldestOplogEntryOpTime: " + << oldestOplogEntryOpTime.toString()); } -void ServerTransactionsMetrics::removeActiveTS(Timestamp oldestOplogEntryTS) { - auto it = _oldestActiveOplogEntryTS.find(oldestOplogEntryTS); - invariant(it != _oldestActiveOplogEntryTS.end(), - str::stream() << "This oplog entry timestamp does not exist " - << "or has already been removed." - << "TS: " - << oldestOplogEntryTS.toString()); - _oldestActiveOplogEntryTS.erase(it); +void ServerTransactionsMetrics::removeActiveOpTime(repl::OpTime oldestOplogEntryOpTime, + boost::optional<repl::OpTime> finishOpTime) { + auto it = _oldestActiveOplogEntryOpTimes.find(oldestOplogEntryOpTime); + invariant(it != _oldestActiveOplogEntryOpTimes.end(), + str::stream() << "This oplog entry OpTime does not exist in or has already been " + << "removed from oldestActiveOplogEntryOpTimes." + << "OpTime: " + << oldestOplogEntryOpTime.toString()); + _oldestActiveOplogEntryOpTimes.erase(it); + + if (!finishOpTime) { + return; + } + + // The transaction's oldestOplogEntryOpTime now has a corresponding finishTime, which will + // be its commit or abort oplog entry OpTime. Add this pair to the + // oldestNonMajorityCommittedOpTimes. + // Currently, the oldestOplogEntryOpTime will be a prepareOpTime so we will only have a + // finishOpTime if we are committing a prepared transaction or aborting an active prepared + // transaction. + std::pair<repl::OpTime, repl::OpTime> opTimeToRemove(oldestOplogEntryOpTime, + repl::OpTime::max()); + auto it2 = _oldestNonMajorityCommittedOpTimes.find(opTimeToRemove); + invariant(it2 != _oldestNonMajorityCommittedOpTimes.end(), + str::stream() << "This oplog entry OpTime does not exist in or has already been " + << "removed from oldestNonMajorityCommittedOpTimes" + << "oldestOplogEntryOpTime: " + << oldestOplogEntryOpTime.toString()); + _oldestNonMajorityCommittedOpTimes.erase(it2); + + std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime, + *finishOpTime); + auto ret = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime); + // If ret.second is false, the OpTime we tried to insert already existed. + invariant(ret.second, + str::stream() << "This oplog entry OpTime pair already exists in " + << "oldestNonMajorityCommittedOpTimes." + << "oldestOplogEntryOpTime: " + << oldestOplogEntryOpTime.toString() + << "finishOpTime: " + << finishOpTime->toString()); +} + +boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestNonMajorityCommittedOpTime() + const { + if (_oldestNonMajorityCommittedOpTimes.empty()) { + return boost::none; + } + const auto oldestNonMajorityCommittedOpTime = _oldestNonMajorityCommittedOpTimes.begin()->first; + invariant(!oldestNonMajorityCommittedOpTime.isNull()); + return oldestNonMajorityCommittedOpTime; +} + +void ServerTransactionsMetrics::removeOpTimesLessThanOrEqToCommittedOpTime( + repl::OpTime committedOpTime) { + // Iterate through oldestNonMajorityCommittedOpTimes and remove all pairs whose "finishOpTime" + // is now less than or equal to the commit point. + for (auto it = _oldestNonMajorityCommittedOpTimes.begin(); + it != _oldestNonMajorityCommittedOpTimes.end();) { + if (it->second <= committedOpTime) { + it = _oldestNonMajorityCommittedOpTimes.erase(it); + } else { + ++it; + } + } +} + +boost::optional<repl::OpTime> +ServerTransactionsMetrics::getFinishOpTimeOfOldestNonMajCommitted_forTest() const { + if (_oldestNonMajorityCommittedOpTimes.empty()) { + return boost::none; + } + const auto oldestNonMajorityCommittedOpTime = + _oldestNonMajorityCommittedOpTimes.begin()->second; + return oldestNonMajorityCommittedOpTime; } -unsigned int ServerTransactionsMetrics::getTotalActiveTS() const { - return _oldestActiveOplogEntryTS.size(); +unsigned int ServerTransactionsMetrics::getTotalActiveOpTimes() const { + return _oldestActiveOplogEntryOpTimes.size(); } void ServerTransactionsMetrics::updateStats(TransactionsStats* stats) { |