diff options
Diffstat (limited to 'src/mongo/db/server_transactions_metrics.cpp')
-rw-r--r-- | src/mongo/db/server_transactions_metrics.cpp | 119 |
1 files changed, 13 insertions, 106 deletions
diff --git a/src/mongo/db/server_transactions_metrics.cpp b/src/mongo/db/server_transactions_metrics.cpp index a46a663e0b1..5696b7079be 100644 --- a/src/mongo/db/server_transactions_metrics.cpp +++ b/src/mongo/db/server_transactions_metrics.cpp @@ -149,8 +149,9 @@ void ServerTransactionsMetrics::decrementCurrentPrepared() { _currentPrepared.fetchAndSubtract(1); } -boost::optional<repl::OpTime> ServerTransactionsMetrics::_calculateOldestActiveOpTime( - WithLock) const { + +boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestActiveOpTime() const { + stdx::lock_guard<stdx::mutex> lm(_mutex); if (_oldestActiveOplogEntryOpTimes.empty()) { return boost::none; } @@ -162,110 +163,20 @@ void ServerTransactionsMetrics::addActiveOpTime(repl::OpTime oldestOplogEntryOpT auto ret = _oldestActiveOplogEntryOpTimes.insert(oldestOplogEntryOpTime); // If ret.second is false, the OpTime we tried to insert already existed. invariant(ret.second, - str::stream() << "This oplog entry OpTime already exists in " - << "oldestActiveOplogEntryOpTimes." + str::stream() << "This oplog entry OpTime already exists." << "oldestOplogEntryOpTime: " << oldestOplogEntryOpTime.toString()); - - // Add this OpTime to the oldestNonMajorityCommittedOpTimes set with a finishOpTime of - // Timestamp::max() to signify that it has not been committed/aborted. - std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime, - repl::OpTime::max()); - auto ret2 = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime); - // If ret2.second is false, the OpTime we tried to insert already existed. - invariant(ret2.second, - str::stream() << "This oplog entry OpTime already exists in " - << "oldestNonMajorityCommittedOpTimes." - << "oldestOplogEntryOpTime: " - << oldestOplogEntryOpTime.toString()); - _oldestActiveOplogEntryOpTime = _calculateOldestActiveOpTime(lm); } -void ServerTransactionsMetrics::removeActiveOpTime(repl::OpTime oldestOplogEntryOpTime, - boost::optional<repl::OpTime> finishOpTime) { +void ServerTransactionsMetrics::removeActiveOpTime(repl::OpTime oldestOplogEntryOpTime) { stdx::lock_guard<stdx::mutex> lm(_mutex); auto it = _oldestActiveOplogEntryOpTimes.find(oldestOplogEntryOpTime); invariant(it != _oldestActiveOplogEntryOpTimes.end(), - str::stream() << "This oplog entry OpTime does not exist in or has already been " - << "removed from oldestActiveOplogEntryOpTimes." - << "OpTime: " - << oldestOplogEntryOpTime.toString()); - _oldestActiveOplogEntryOpTimes.erase(it); - - if (!finishOpTime) { - return; - } - - // The transaction's oldestOplogEntryOpTime now has a corresponding finishTime, which will - // be its commit or abort oplog entry OpTime. Add this pair to the - // oldestNonMajorityCommittedOpTimes. - // Currently, the oldestOplogEntryOpTime will be a prepareOpTime so we will only have a - // finishOpTime if we are committing a prepared transaction or aborting an active prepared - // transaction. - std::pair<repl::OpTime, repl::OpTime> opTimeToRemove(oldestOplogEntryOpTime, - repl::OpTime::max()); - auto it2 = _oldestNonMajorityCommittedOpTimes.find(opTimeToRemove); - invariant(it2 != _oldestNonMajorityCommittedOpTimes.end(), - str::stream() << "This oplog entry OpTime does not exist in or has already been " - << "removed from oldestNonMajorityCommittedOpTimes" + str::stream() << "This oplog entry OpTime does not exist " + << "or has already been removed." << "oldestOplogEntryOpTime: " << oldestOplogEntryOpTime.toString()); - _oldestNonMajorityCommittedOpTimes.erase(it2); - - std::pair<repl::OpTime, repl::OpTime> nonMajCommittedOpTime(oldestOplogEntryOpTime, - *finishOpTime); - auto ret = _oldestNonMajorityCommittedOpTimes.insert(nonMajCommittedOpTime); - // If ret.second is false, the OpTime we tried to insert already existed. - invariant(ret.second, - str::stream() << "This oplog entry OpTime pair already exists in " - << "oldestNonMajorityCommittedOpTimes." - << "oldestOplogEntryOpTime: " - << oldestOplogEntryOpTime.toString() - << "finishOpTime: " - << finishOpTime->toString()); - _oldestActiveOplogEntryOpTime = _calculateOldestActiveOpTime(lm); -} - -boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestNonMajorityCommittedOpTime() - const { - stdx::lock_guard<stdx::mutex> lm(_mutex); - if (_oldestNonMajorityCommittedOpTimes.empty()) { - return boost::none; - } - const auto oldestNonMajorityCommittedOpTime = _oldestNonMajorityCommittedOpTimes.begin()->first; - invariant(!oldestNonMajorityCommittedOpTime.isNull()); - return oldestNonMajorityCommittedOpTime; -} - -void ServerTransactionsMetrics::removeOpTimesLessThanOrEqToCommittedOpTime( - repl::OpTime committedOpTime) { - stdx::lock_guard<stdx::mutex> lm(_mutex); - // Iterate through oldestNonMajorityCommittedOpTimes and remove all pairs whose "finishOpTime" - // is now less than or equal to the commit point. - for (auto it = _oldestNonMajorityCommittedOpTimes.begin(); - it != _oldestNonMajorityCommittedOpTimes.end();) { - if (it->second <= committedOpTime) { - it = _oldestNonMajorityCommittedOpTimes.erase(it); - } else { - ++it; - } - } -} - -boost::optional<repl::OpTime> -ServerTransactionsMetrics::getFinishOpTimeOfOldestNonMajCommitted_forTest() const { - stdx::lock_guard<stdx::mutex> lm(_mutex); - if (_oldestNonMajorityCommittedOpTimes.empty()) { - return boost::none; - } - const auto oldestNonMajorityCommittedOpTime = - _oldestNonMajorityCommittedOpTimes.begin()->second; - return oldestNonMajorityCommittedOpTime; -} - -boost::optional<repl::OpTime> ServerTransactionsMetrics::getOldestActiveOpTime() const { - stdx::lock_guard<stdx::mutex> lm(_mutex); - return _oldestActiveOplogEntryOpTime; + _oldestActiveOplogEntryOpTimes.erase(it); } unsigned int ServerTransactionsMetrics::getTotalActiveOpTimes() const { @@ -294,21 +205,17 @@ void ServerTransactionsMetrics::updateStats(TransactionsStats* stats, OperationC stats->setCurrentPrepared(_currentPrepared.load()); stats->setOldestOpenUnpreparedReadTimestamp( ServerTransactionsMetrics::_getOldestOpenUnpreparedReadTimestamp(opCtx)); - // Acquire _mutex before reading _oldestActiveOplogEntryOpTime. - stdx::lock_guard<stdx::mutex> lm(_mutex); - // To avoid compression loss, we use the null OpTime if no oldest active transaction optime is - // stored. - repl::OpTime oldestActiveOplogEntryOpTime = (_oldestActiveOplogEntryOpTime != boost::none) - ? _oldestActiveOplogEntryOpTime.get() - : repl::OpTime(); + // To avoid compression loss, we use the null OpTime if no oldest active transaction + // optime is stored. + auto oldestActiveOpTime = getOldestActiveOpTime(); + repl::OpTime oldestActiveOplogEntryOpTime = + (oldestActiveOpTime != boost::none) ? oldestActiveOpTime.get() : repl::OpTime(); stats->setOldestActiveOplogEntryOpTime(oldestActiveOplogEntryOpTime); } void ServerTransactionsMetrics::clearOpTimes() { stdx::lock_guard<stdx::mutex> lm(_mutex); - _oldestActiveOplogEntryOpTime = boost::none; _oldestActiveOplogEntryOpTimes.clear(); - _oldestNonMajorityCommittedOpTimes.clear(); } namespace { |