summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/ticketholder.h
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-09-14 11:53:48 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-14 12:34:13 +0000
commit40e9c7198a7f0742a2347232166695aae7312286 (patch)
tree4ebc51d423f3e89eb6c444350510c6a64e46feae /src/mongo/util/concurrency/ticketholder.h
parente3336795ba88f7fd4cea05c917f710bb753def9a (diff)
downloadmongo-40e9c7198a7f0742a2347232166695aae7312286.tar.gz
SERVER-68314 Add priority queueing metrics to the ticketholder
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.h')
-rw-r--r--src/mongo/util/concurrency/ticketholder.h83
1 files changed, 63 insertions, 20 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 9181b944a56..e9846e0173f 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -101,6 +101,9 @@ private:
virtual void _release(AdmissionContext* admCtx) noexcept = 0;
};
+/**
+ * A ticketholder which manages both aggregate and policy specific queueing statistics.
+ */
class TicketHolderWithQueueingStats : public TicketHolder {
friend class ReaderWriterTicketHolder;
@@ -150,14 +153,29 @@ public:
return _outof.loadRelaxed();
}
- virtual int queued() const {
- auto removed = _totalRemovedQueue.loadRelaxed();
- auto added = _totalAddedQueue.loadRelaxed();
- return std::max(static_cast<int>(added - removed), 0);
- }
+ /**
+ * Returns the total number of operations queued - regardles of queueing policy.
+ */
+ virtual int queued() const = 0;
void appendStats(BSONObjBuilder& b) const override;
+ /**
+ * Statistics for queueing mechanisms in the TicketHolder implementations. The term "Queue" is a
+ * loose abstraction for the way in which operations are queued when there are no available
+ * tickets.
+ */
+ struct QueueStats {
+ AtomicWord<std::int64_t> totalAddedQueue{0};
+ AtomicWord<std::int64_t> totalRemovedQueue{0};
+ AtomicWord<std::int64_t> totalFinishedProcessing{0};
+ AtomicWord<std::int64_t> totalNewAdmissions{0};
+ AtomicWord<std::int64_t> totalTimeProcessingMicros{0};
+ AtomicWord<std::int64_t> totalStartedProcessing{0};
+ AtomicWord<std::int64_t> totalCanceled{0};
+ AtomicWord<std::int64_t> totalTimeQueuedMicros{0};
+ };
+
private:
virtual boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) = 0;
@@ -174,13 +192,11 @@ private:
virtual void _resize(int newSize, int oldSize) noexcept = 0;
- AtomicWord<std::int64_t> _totalAddedQueue{0};
- AtomicWord<std::int64_t> _totalRemovedQueue{0};
- AtomicWord<std::int64_t> _totalFinishedProcessing{0};
- AtomicWord<std::int64_t> _totalNewAdmissions{0};
- AtomicWord<std::int64_t> _totalTimeProcessingMicros{0};
- AtomicWord<std::int64_t> _totalStartedProcessing{0};
- AtomicWord<std::int64_t> _totalCanceled{0};
+ /**
+ * Fetches the queueing statistics corresponding to the 'admCtx'. All statistics that are queue
+ * specific should be updated through the resulting 'QueueStats'.
+ */
+ virtual QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept = 0;
Mutex _resizeMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2),
"TicketHolderWithQueueingStats::_resizeMutex");
@@ -237,7 +253,6 @@ public:
private:
void _release(AdmissionContext* admCtx) noexcept override final;
-private:
std::unique_ptr<TicketHolderWithQueueingStats> _reader;
std::unique_ptr<TicketHolderWithQueueingStats> _writer;
};
@@ -249,6 +264,12 @@ public:
int available() const override final;
+ int queued() const override final {
+ auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed();
+ auto added = _semaphoreStats.totalAddedQueue.loadRelaxed();
+ return std::max(static_cast<int>(added - removed), 0);
+ };
+
private:
boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
AdmissionContext* admCtx,
@@ -262,6 +283,9 @@ private:
void _resize(int newSize, int oldSize) noexcept override final;
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final {
+ return _semaphoreStats;
+ }
#if defined(__linux__)
mutable sem_t _sem;
@@ -273,9 +297,7 @@ private:
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex");
stdx::condition_variable _newTicket;
#endif
-
- // Implementation statistics.
- AtomicWord<std::int64_t> _totalTimeQueuedMicros{0};
+ QueueStats _semaphoreStats;
};
/**
@@ -318,6 +340,19 @@ protected:
return _queuedThreads;
}
+ /**
+ * Returns a reference to the Queue statistics that allows callers to update the statistics.
+ */
+ QueueStats& getStatsToUse() {
+ return _stats;
+ }
+ /**
+ * Returns a read-only reference to the Queue statistics.
+ */
+ const QueueStats& getStats() const {
+ return _stats;
+ }
+
private:
void _signalThreadWoken();
@@ -325,6 +360,7 @@ protected:
AtomicWord<int> _threadsToBeWoken{0};
stdx::condition_variable _cv;
SchedulingTicketHolder* _holder;
+ QueueStats _stats;
};
std::vector<Queue> _queues;
@@ -351,9 +387,10 @@ private:
void _releaseQueue(AdmissionContext* admCtx) noexcept override final;
- void _appendImplStats(BSONObjBuilder& b) const override final{};
-
void _resize(int newSize, int oldSize) noexcept override final;
+
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override = 0;
+
/**
* Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket.
* Implementors MUST wake at least one waiting thread if at least one thread is pending to be
@@ -368,10 +405,12 @@ private:
*/
virtual void _dequeueWaitingThread() = 0;
+ void _appendImplStats(BSONObjBuilder& b) const override = 0;
+
/**
* Selects the queue to use for the current thread given the provided arguments.
*/
- virtual Queue& _getQueueToUse(OperationContext* opCtx, const AdmissionContext* admCtx) = 0;
+ virtual Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept = 0;
QueueMutex _queueMutex;
AtomicWord<int> _ticketsAvailable;
@@ -391,8 +430,12 @@ private:
};
void _dequeueWaitingThread() override final;
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final;
+
+ void _appendImplStats(BSONObjBuilder& b) const override final;
+ void _appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const;
- Queue& _getQueueToUse(OperationContext* opCtx, const AdmissionContext* admCtx) override final;
+ Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept override final;
};
/**