diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-09-14 11:53:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-14 12:34:13 +0000 |
commit | 40e9c7198a7f0742a2347232166695aae7312286 (patch) | |
tree | 4ebc51d423f3e89eb6c444350510c6a64e46feae /src/mongo/util/concurrency/ticketholder.h | |
parent | e3336795ba88f7fd4cea05c917f710bb753def9a (diff) | |
download | mongo-40e9c7198a7f0742a2347232166695aae7312286.tar.gz |
SERVER-68314 Add priority queueing metrics to the ticketholder
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.h')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 83 |
1 files changed, 63 insertions, 20 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 9181b944a56..e9846e0173f 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -101,6 +101,9 @@ private: virtual void _release(AdmissionContext* admCtx) noexcept = 0; }; +/** + * A ticketholder which manages both aggregate and policy specific queueing statistics. + */ class TicketHolderWithQueueingStats : public TicketHolder { friend class ReaderWriterTicketHolder; @@ -150,14 +153,29 @@ public: return _outof.loadRelaxed(); } - virtual int queued() const { - auto removed = _totalRemovedQueue.loadRelaxed(); - auto added = _totalAddedQueue.loadRelaxed(); - return std::max(static_cast<int>(added - removed), 0); - } + /** + * Returns the total number of operations queued - regardles of queueing policy. + */ + virtual int queued() const = 0; void appendStats(BSONObjBuilder& b) const override; + /** + * Statistics for queueing mechanisms in the TicketHolder implementations. The term "Queue" is a + * loose abstraction for the way in which operations are queued when there are no available + * tickets. + */ + struct QueueStats { + AtomicWord<std::int64_t> totalAddedQueue{0}; + AtomicWord<std::int64_t> totalRemovedQueue{0}; + AtomicWord<std::int64_t> totalFinishedProcessing{0}; + AtomicWord<std::int64_t> totalNewAdmissions{0}; + AtomicWord<std::int64_t> totalTimeProcessingMicros{0}; + AtomicWord<std::int64_t> totalStartedProcessing{0}; + AtomicWord<std::int64_t> totalCanceled{0}; + AtomicWord<std::int64_t> totalTimeQueuedMicros{0}; + }; + private: virtual boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) = 0; @@ -174,13 +192,11 @@ private: virtual void _resize(int newSize, int oldSize) noexcept = 0; - AtomicWord<std::int64_t> _totalAddedQueue{0}; - AtomicWord<std::int64_t> _totalRemovedQueue{0}; - AtomicWord<std::int64_t> _totalFinishedProcessing{0}; - AtomicWord<std::int64_t> _totalNewAdmissions{0}; - AtomicWord<std::int64_t> _totalTimeProcessingMicros{0}; - AtomicWord<std::int64_t> _totalStartedProcessing{0}; - AtomicWord<std::int64_t> _totalCanceled{0}; + /** + * Fetches the queueing statistics corresponding to the 'admCtx'. All statistics that are queue + * specific should be updated through the resulting 'QueueStats'. + */ + virtual QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept = 0; Mutex _resizeMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2), "TicketHolderWithQueueingStats::_resizeMutex"); @@ -237,7 +253,6 @@ public: private: void _release(AdmissionContext* admCtx) noexcept override final; -private: std::unique_ptr<TicketHolderWithQueueingStats> _reader; std::unique_ptr<TicketHolderWithQueueingStats> _writer; }; @@ -249,6 +264,12 @@ public: int available() const override final; + int queued() const override final { + auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed(); + auto added = _semaphoreStats.totalAddedQueue.loadRelaxed(); + return std::max(static_cast<int>(added - removed), 0); + }; + private: boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, AdmissionContext* admCtx, @@ -262,6 +283,9 @@ private: void _resize(int newSize, int oldSize) noexcept override final; + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final { + return _semaphoreStats; + } #if defined(__linux__) mutable sem_t _sem; @@ -273,9 +297,7 @@ private: MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex"); stdx::condition_variable _newTicket; #endif - - // Implementation statistics. - AtomicWord<std::int64_t> _totalTimeQueuedMicros{0}; + QueueStats _semaphoreStats; }; /** @@ -318,6 +340,19 @@ protected: return _queuedThreads; } + /** + * Returns a reference to the Queue statistics that allows callers to update the statistics. + */ + QueueStats& getStatsToUse() { + return _stats; + } + /** + * Returns a read-only reference to the Queue statistics. + */ + const QueueStats& getStats() const { + return _stats; + } + private: void _signalThreadWoken(); @@ -325,6 +360,7 @@ protected: AtomicWord<int> _threadsToBeWoken{0}; stdx::condition_variable _cv; SchedulingTicketHolder* _holder; + QueueStats _stats; }; std::vector<Queue> _queues; @@ -351,9 +387,10 @@ private: void _releaseQueue(AdmissionContext* admCtx) noexcept override final; - void _appendImplStats(BSONObjBuilder& b) const override final{}; - void _resize(int newSize, int oldSize) noexcept override final; + + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override = 0; + /** * Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket. * Implementors MUST wake at least one waiting thread if at least one thread is pending to be @@ -368,10 +405,12 @@ private: */ virtual void _dequeueWaitingThread() = 0; + void _appendImplStats(BSONObjBuilder& b) const override = 0; + /** * Selects the queue to use for the current thread given the provided arguments. */ - virtual Queue& _getQueueToUse(OperationContext* opCtx, const AdmissionContext* admCtx) = 0; + virtual Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept = 0; QueueMutex _queueMutex; AtomicWord<int> _ticketsAvailable; @@ -391,8 +430,12 @@ private: }; void _dequeueWaitingThread() override final; + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final; + + void _appendImplStats(BSONObjBuilder& b) const override final; + void _appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const; - Queue& _getQueueToUse(OperationContext* opCtx, const AdmissionContext* admCtx) override final; + Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept override final; }; /** |