diff options
author | Jordi Olivares Provencio <jordi.olivares-provencio@mongodb.com> | 2022-06-01 16:35:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-06-01 17:12:39 +0000 |
commit | a5db05d955c38c241cb65fbc3039ec7835b5516f (patch) | |
tree | 140e25e4f0bdded6a01bfe4b2169f386878e5ede /src/mongo/util/concurrency/ticketholder.h | |
parent | 37e28d05ca12d2b3835bbe2145cbe258872e7ed6 (diff) | |
download | mongo-a5db05d955c38c241cb65fbc3039ec7835b5516f.tar.gz |
SERVER-66796 Make TicketHolder metrics independent from Impl
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.h')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 142 |
1 files changed, 72 insertions, 70 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 3e8348a80ac..f521b7989f8 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -57,21 +57,21 @@ public: */ enum WaitMode { kInterruptible, kUninterruptible }; + TicketHolder(int num, ServiceContext* svcCtx) : _outof(num), _serviceContext(svcCtx){}; + virtual ~TicketHolder() = 0; /** * Attempts to acquire a ticket without blocking. * Returns a boolean indicating whether the operation was successful or not. */ - virtual boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) = 0; + boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx); /** * Attempts to acquire a ticket. Blocks until a ticket is acquired or the OperationContext * 'opCtx' is killed, throwing an AssertionException. */ - virtual Ticket waitForTicket(OperationContext* opCtx, - AdmissionContext* admCtx, - WaitMode waitMode) = 0; + Ticket waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx, WaitMode waitMode); /** * Attempts to acquire a ticket within a deadline, 'until'. Returns 'true' if a ticket is @@ -79,71 +79,93 @@ public: * AssertionException if the OperationContext 'opCtx' is killed and no waits for tickets can * proceed. */ - virtual boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) = 0; + boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode); - virtual Status resize(int newSize) = 0; + Status resize(int newSize); virtual int available() const = 0; - virtual int used() const = 0; + virtual int used() const { + return outof() - available(); + } - virtual int outof() const = 0; + virtual int outof() const { + return _outof.loadRelaxed(); + } - virtual void appendStats(BSONObjBuilder& b) const = 0; + virtual int queued() const { + auto removed = _totalRemovedQueue.loadRelaxed(); + auto added = _totalAddedQueue.loadRelaxed(); + return std::max(static_cast<int>(added - removed), 0); + } -private: - virtual void _release(AdmissionContext* admCtx) noexcept = 0; -}; + void appendStats(BSONObjBuilder& b) const; -class SemaphoreTicketHolder final : public TicketHolder { -public: - explicit SemaphoreTicketHolder(int num, ServiceContext* serviceContext); - ~SemaphoreTicketHolder() override final; +private: + virtual boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) = 0; - boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) override final; + virtual boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode) = 0; - Ticket waitForTicket(OperationContext* opCtx, - AdmissionContext* admCtx, - WaitMode waitMode) override final; + virtual void _appendImplStats(BSONObjBuilder& b) const = 0; - boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) override final; + void _releaseAndUpdateMetrics(AdmissionContext* admCtx) noexcept; + virtual void _release(AdmissionContext* admCtx) noexcept = 0; - Status resize(int newSize) override final; + AtomicWord<std::int64_t> _totalAddedQueue{0}; + AtomicWord<std::int64_t> _totalRemovedQueue{0}; + AtomicWord<std::int64_t> _totalFinishedProcessing{0}; + AtomicWord<std::int64_t> _totalNewAdmissions{0}; + AtomicWord<std::int64_t> _totalTimeProcessingMicros{0}; + AtomicWord<std::int64_t> _totalStartedProcessing{0}; + AtomicWord<std::int64_t> _totalCanceled{0}; - int available() const override final; + Mutex _resizeMutex = + MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2), "TicketHolder::_resizeMutex"); + AtomicWord<int> _outof; - int used() const override final; +protected: + ServiceContext* _serviceContext; +}; - int outof() const override final; +class SemaphoreTicketHolder final : public TicketHolder { +public: + explicit SemaphoreTicketHolder(int num, ServiceContext* serviceContext); + ~SemaphoreTicketHolder() override final; - void appendStats(BSONObjBuilder& b) const override final; + int available() const override final; private: + boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode) override final; + + boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final; void _release(AdmissionContext* admCtx) noexcept override final; + void _appendImplStats(BSONObjBuilder& b) const override final; + #if defined(__linux__) mutable sem_t _sem; - // You can read _outof without a lock, but have to hold _resizeMutex to change. - AtomicWord<int> _outof; - Mutex _resizeMutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_resizeMutex"); #else bool _tryAcquire(); - AtomicWord<int> _outof; int _num; Mutex _mutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex"); stdx::condition_variable _newTicket; #endif + + // Implementation statistics. + AtomicWord<std::int64_t> _totalTimeQueuedMicros{0}; }; /** @@ -156,46 +178,26 @@ public: explicit FifoTicketHolder(int num, ServiceContext* serviceContext); ~FifoTicketHolder() override final; - boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) override final; - - Ticket waitForTicket(OperationContext* opCtx, - AdmissionContext* admCtx, - WaitMode waitMode) override final; - - boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) override final; - - - Status resize(int newSize) override final; - int available() const override final; - int used() const override final; + int queued() const override final { + return _enqueuedElements.load(); + } - int outof() const override final; +private: + boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, + AdmissionContext* admCtx, + Date_t until, + WaitMode waitMode) override final; - int queued() const; + boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final; - void appendStats(BSONObjBuilder& b) const override final; + void _appendImplStats(BSONObjBuilder& b) const override final; -private: void _release(AdmissionContext* admCtx) noexcept override final; - void _release(WithLock); - - Mutex _resizeMutex = - MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2), "FifoTicketHolder::_resizeMutex"); - AtomicWord<int> _capacity; - AtomicWord<std::int64_t> _totalNewAdmissions{0}; - AtomicWord<std::int64_t> _totalAddedQueue{0}; - AtomicWord<std::int64_t> _totalRemovedQueue{0}; + // Implementation statistics. AtomicWord<std::int64_t> _totalTimeQueuedMicros{0}; - AtomicWord<std::int64_t> _totalStartedProcessing{0}; - AtomicWord<std::int64_t> _totalFinishedProcessing{0}; - AtomicWord<std::int64_t> _totalTimeProcessingMicros{0}; - AtomicWord<std::int64_t> _totalCanceled{0}; enum class WaitingState { Waiting, Cancelled, Assigned }; struct WaitingElement { @@ -211,7 +213,6 @@ private: MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(1), "FifoTicketHolder::_queueMutex"); AtomicWord<int> _enqueuedElements; AtomicWord<int> _ticketsAvailable; - ServiceContext* _serviceContext; }; /** @@ -219,6 +220,7 @@ private: * released when going out of scope. */ class Ticket { + friend class TicketHolder; friend class SemaphoreTicketHolder; friend class FifoTicketHolder; @@ -239,7 +241,7 @@ public: ~Ticket() { if (_ticketholder) { - _ticketholder->_release(_admissionContext); + _ticketholder->_releaseAndUpdateMetrics(_admissionContext); } } |