diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-09-14 11:53:48 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-14 12:34:13 +0000 |
commit | 40e9c7198a7f0742a2347232166695aae7312286 (patch) | |
tree | 4ebc51d423f3e89eb6c444350510c6a64e46feae /src/mongo/util | |
parent | e3336795ba88f7fd4cea05c917f710bb753def9a (diff) | |
download | mongo-40e9c7198a7f0742a2347232166695aae7312286.tar.gz |
SERVER-68314 Add priority queueing metrics to the ticketholder
Diffstat (limited to 'src/mongo/util')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 128 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 83 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_test.cpp | 390 |
3 files changed, 437 insertions, 164 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index 8a098ce561b..b5663a65b54 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -146,28 +146,16 @@ void TicketHolderWithQueueingStats::appendStats(BSONObjBuilder& b) const { b.append("out", used()); b.append("available", available()); b.append("totalTickets", outof()); - auto removed = _totalRemovedQueue.loadRelaxed(); - auto added = _totalAddedQueue.loadRelaxed(); - b.append("addedToQueue", added); - b.append("removedFromQueue", removed); - b.append("queueLength", std::max(static_cast<int>(added - removed), 0)); - auto finished = _totalFinishedProcessing.loadRelaxed(); - auto started = _totalStartedProcessing.loadRelaxed(); - b.append("startedProcessing", started); - b.append("processing", std::max(static_cast<int>(started - finished), 0)); - b.append("finishedProcessing", finished); - b.append("totalTimeProcessingMicros", _totalTimeProcessingMicros.loadRelaxed()); - b.append("canceled", _totalCanceled.loadRelaxed()); - b.append("newAdmissions", _totalNewAdmissions.loadRelaxed()); _appendImplStats(b); } void TicketHolderWithQueueingStats::_release(AdmissionContext* admCtx) noexcept { - _totalFinishedProcessing.fetchAndAddRelaxed(1); + auto& queueStats = _getQueueStatsToUse(admCtx); + queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1); auto startTime = admCtx->getStartProcessingTime(); auto tickSource = _serviceContext->getTickSource(); auto delta = tickSource->spanTo<Microseconds>(startTime, tickSource->getTicks()); - _totalTimeProcessingMicros.fetchAndAddRelaxed(delta.count()); + queueStats.totalTimeProcessingMicros.fetchAndAddRelaxed(delta.count()); _releaseQueue(admCtx); } @@ -185,11 +173,12 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::tryAcquire(AdmissionConte auto ticket = _tryAcquireImpl(admCtx); // Track statistics. if (ticket) { + auto& queueStats = _getQueueStatsToUse(admCtx); if (admCtx->getAdmissions() == 0) { - _totalNewAdmissions.fetchAndAddRelaxed(1); + queueStats.totalNewAdmissions.fetchAndAddRelaxed(1); } admCtx->start(_serviceContext->getTickSource()); - _totalStartedProcessing.fetchAndAddRelaxed(1); + queueStats.totalStartedProcessing.fetchAndAddRelaxed(1); } return ticket; } @@ -206,15 +195,23 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat return ticket; } - // Track statistics - - _totalAddedQueue.fetchAndAddRelaxed(1); - ON_BLOCK_EXIT([&] { _totalRemovedQueue.fetchAndAddRelaxed(1); }); + auto& queueStats = _getQueueStatsToUse(admCtx); + auto tickSource = _serviceContext->getTickSource(); + auto currentWaitTime = tickSource->getTicks(); + auto updateQueuedTime = [&]() { + auto oldWaitTime = std::exchange(currentWaitTime, tickSource->getTicks()); + auto waitDelta = tickSource->spanTo<Microseconds>(oldWaitTime, currentWaitTime).count(); + queueStats.totalTimeQueuedMicros.fetchAndAddRelaxed(waitDelta); + }; + queueStats.totalAddedQueue.fetchAndAddRelaxed(1); + ON_BLOCK_EXIT([&] { + updateQueuedTime(); + queueStats.totalRemovedQueue.fetchAndAddRelaxed(1); + }); - // Enqueue. ScopeGuard cancelWait([&] { // Update statistics. - _totalCanceled.fetchAndAddRelaxed(1); + queueStats.totalCanceled.fetchAndAddRelaxed(1); }); auto ticket = _waitForTicketUntilImpl(opCtx, admCtx, until, waitMode); @@ -222,17 +219,32 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat if (ticket) { cancelWait.dismiss(); if (admCtx->getAdmissions() == 0) { - _totalNewAdmissions.fetchAndAddRelaxed(1); + queueStats.totalNewAdmissions.fetchAndAddRelaxed(1); } admCtx->start(_serviceContext->getTickSource()); - _totalStartedProcessing.fetchAndAddRelaxed(1); + queueStats.totalStartedProcessing.fetchAndAddRelaxed(1); return ticket; } else { return boost::none; } } - +void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const { + auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed(); + auto added = _semaphoreStats.totalAddedQueue.loadRelaxed(); + b.append("addedToQueue", added); + b.append("removedFromQueue", removed); + b.append("queueLength", std::max(static_cast<int>(added - removed), 0)); + auto finished = _semaphoreStats.totalFinishedProcessing.loadRelaxed(); + auto started = _semaphoreStats.totalStartedProcessing.loadRelaxed(); + b.append("startedProcessing", started); + b.append("processing", std::max(static_cast<int>(started - finished), 0)); + b.append("finishedProcessing", finished); + b.append("totalTimeProcessingMicros", _semaphoreStats.totalTimeProcessingMicros.loadRelaxed()); + b.append("canceled", _semaphoreStats.totalCanceled.loadRelaxed()); + b.append("newAdmissions", _semaphoreStats.totalNewAdmissions.loadRelaxed()); + b.append("totalTimeQueuedMicros", _semaphoreStats.totalTimeQueuedMicros.loadRelaxed()); +} #if defined(__linux__) namespace { @@ -287,18 +299,6 @@ boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(Operation AdmissionContext* admCtx, Date_t until, WaitMode waitMode) { - - auto tickSource = _serviceContext->getTickSource(); - // Track statistics - auto currentWaitTime = tickSource->getTicks(); - auto updateQueuedTime = [&]() { - auto oldWaitTime = std::exchange(currentWaitTime, tickSource->getTicks()); - auto waitDelta = tickSource->spanTo<Microseconds>(oldWaitTime, currentWaitTime).count(); - _totalTimeQueuedMicros.fetchAndAddRelaxed(waitDelta); - }; - - ON_BLOCK_EXIT(updateQueuedTime); - const Milliseconds intervalMs(500); struct timespec ts; @@ -325,8 +325,6 @@ boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(Operation // It is possible to unset 'errno' after a call to checkForInterrupt(). if (waitMode == WaitMode::kInterruptible) opCtx->checkForInterrupt(); - - updateQueuedTime(); } return Ticket{this, admCtx}; } @@ -435,10 +433,6 @@ void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { } #endif -void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const { - b.append("totalTimeQueuedMicros", _totalTimeQueuedMicros.loadRelaxed()); -} - SchedulingTicketHolder::SchedulingTicketHolder(int numTickets, unsigned int numQueues, ServiceContext* serviceContext) @@ -463,7 +457,6 @@ int SchedulingTicketHolder::queued() const { void SchedulingTicketHolder::_releaseQueue(AdmissionContext* admCtx) noexcept { invariant(admCtx); - // The idea behind the release mechanism consists of a consistent view of queued elements // waiting for a ticket and many threads releasing tickets simultaneously. The releasers will // proceed to attempt to dequeue an element by seeing if there are threads not woken and waking @@ -509,7 +502,7 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio WaitMode waitMode) { invariant(admCtx); - auto& queue = _getQueueToUse(opCtx, admCtx); + auto& queue = _getQueueToUse(admCtx); bool assigned; { @@ -617,8 +610,44 @@ void PriorityTicketHolder::_dequeueWaitingThread() { } } +void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { + { + BSONObjBuilder bbb(b.subobjStart("lowPriority")); + auto& queueStats = + _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)].getStats(); + _appendPriorityStats(bbb, queueStats); + bbb.done(); + } + { + BSONObjBuilder bbb(b.subobjStart("normalPriority")); + auto& queueStats = + _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)].getStats(); + _appendPriorityStats(bbb, queueStats); + bbb.done(); + } +} + +void PriorityTicketHolder::_appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const { + auto removed = stats.totalRemovedQueue.loadRelaxed(); + auto added = stats.totalAddedQueue.loadRelaxed(); + + b.append("addedToQueue", added); + b.append("removedFromQueue", removed); + b.append("queueLength", std::max(static_cast<int>(added - removed), 0)); + + auto finished = stats.totalFinishedProcessing.loadRelaxed(); + auto started = stats.totalStartedProcessing.loadRelaxed(); + b.append("startedProcessing", started); + b.append("processing", std::max(static_cast<int>(started - finished), 0)); + b.append("finishedProcessing", finished); + b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed()); + b.append("canceled", stats.totalCanceled.loadRelaxed()); + b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed()); + b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed()); +} + SchedulingTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse( - OperationContext* opCtx, const AdmissionContext* admCtx) { + const AdmissionContext* admCtx) noexcept { auto priority = admCtx->getPriority(); switch (priority) { case AdmissionContext::Priority::kLow: @@ -629,4 +658,9 @@ SchedulingTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse( MONGO_UNREACHABLE; } } + +TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse( + const AdmissionContext* admCtx) noexcept { + return _getQueueToUse(admCtx).getStatsToUse(); +} } // namespace mongo diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 9181b944a56..e9846e0173f 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -101,6 +101,9 @@ private: virtual void _release(AdmissionContext* admCtx) noexcept = 0; }; +/** + * A ticketholder which manages both aggregate and policy specific queueing statistics. + */ class TicketHolderWithQueueingStats : public TicketHolder { friend class ReaderWriterTicketHolder; @@ -150,14 +153,29 @@ public: return _outof.loadRelaxed(); } - virtual int queued() const { - auto removed = _totalRemovedQueue.loadRelaxed(); - auto added = _totalAddedQueue.loadRelaxed(); - return std::max(static_cast<int>(added - removed), 0); - } + /** + * Returns the total number of operations queued - regardles of queueing policy. + */ + virtual int queued() const = 0; void appendStats(BSONObjBuilder& b) const override; + /** + * Statistics for queueing mechanisms in the TicketHolder implementations. The term "Queue" is a + * loose abstraction for the way in which operations are queued when there are no available + * tickets. + */ + struct QueueStats { + AtomicWord<std::int64_t> totalAddedQueue{0}; + AtomicWord<std::int64_t> totalRemovedQueue{0}; + AtomicWord<std::int64_t> totalFinishedProcessing{0}; + AtomicWord<std::int64_t> totalNewAdmissions{0}; + AtomicWord<std::int64_t> totalTimeProcessingMicros{0}; + AtomicWord<std::int64_t> totalStartedProcessing{0}; + AtomicWord<std::int64_t> totalCanceled{0}; + AtomicWord<std::int64_t> totalTimeQueuedMicros{0}; + }; + private: virtual boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) = 0; @@ -174,13 +192,11 @@ private: virtual void _resize(int newSize, int oldSize) noexcept = 0; - AtomicWord<std::int64_t> _totalAddedQueue{0}; - AtomicWord<std::int64_t> _totalRemovedQueue{0}; - AtomicWord<std::int64_t> _totalFinishedProcessing{0}; - AtomicWord<std::int64_t> _totalNewAdmissions{0}; - AtomicWord<std::int64_t> _totalTimeProcessingMicros{0}; - AtomicWord<std::int64_t> _totalStartedProcessing{0}; - AtomicWord<std::int64_t> _totalCanceled{0}; + /** + * Fetches the queueing statistics corresponding to the 'admCtx'. All statistics that are queue + * specific should be updated through the resulting 'QueueStats'. + */ + virtual QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept = 0; Mutex _resizeMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2), "TicketHolderWithQueueingStats::_resizeMutex"); @@ -237,7 +253,6 @@ public: private: void _release(AdmissionContext* admCtx) noexcept override final; -private: std::unique_ptr<TicketHolderWithQueueingStats> _reader; std::unique_ptr<TicketHolderWithQueueingStats> _writer; }; @@ -249,6 +264,12 @@ public: int available() const override final; + int queued() const override final { + auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed(); + auto added = _semaphoreStats.totalAddedQueue.loadRelaxed(); + return std::max(static_cast<int>(added - removed), 0); + }; + private: boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, AdmissionContext* admCtx, @@ -262,6 +283,9 @@ private: void _resize(int newSize, int oldSize) noexcept override final; + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final { + return _semaphoreStats; + } #if defined(__linux__) mutable sem_t _sem; @@ -273,9 +297,7 @@ private: MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex"); stdx::condition_variable _newTicket; #endif - - // Implementation statistics. - AtomicWord<std::int64_t> _totalTimeQueuedMicros{0}; + QueueStats _semaphoreStats; }; /** @@ -318,6 +340,19 @@ protected: return _queuedThreads; } + /** + * Returns a reference to the Queue statistics that allows callers to update the statistics. + */ + QueueStats& getStatsToUse() { + return _stats; + } + /** + * Returns a read-only reference to the Queue statistics. + */ + const QueueStats& getStats() const { + return _stats; + } + private: void _signalThreadWoken(); @@ -325,6 +360,7 @@ protected: AtomicWord<int> _threadsToBeWoken{0}; stdx::condition_variable _cv; SchedulingTicketHolder* _holder; + QueueStats _stats; }; std::vector<Queue> _queues; @@ -351,9 +387,10 @@ private: void _releaseQueue(AdmissionContext* admCtx) noexcept override final; - void _appendImplStats(BSONObjBuilder& b) const override final{}; - void _resize(int newSize, int oldSize) noexcept override final; + + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override = 0; + /** * Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket. * Implementors MUST wake at least one waiting thread if at least one thread is pending to be @@ -368,10 +405,12 @@ private: */ virtual void _dequeueWaitingThread() = 0; + void _appendImplStats(BSONObjBuilder& b) const override = 0; + /** * Selects the queue to use for the current thread given the provided arguments. */ - virtual Queue& _getQueueToUse(OperationContext* opCtx, const AdmissionContext* admCtx) = 0; + virtual Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept = 0; QueueMutex _queueMutex; AtomicWord<int> _ticketsAvailable; @@ -391,8 +430,12 @@ private: }; void _dequeueWaitingThread() override final; + QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final; + + void _appendImplStats(BSONObjBuilder& b) const override final; + void _appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const; - Queue& _getQueueToUse(OperationContext* opCtx, const AdmissionContext* admCtx) override final; + Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept override final; }; /** diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp index ccbf831097d..779b0777991 100644 --- a/src/mongo/util/concurrency/ticketholder_test.cpp +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -168,6 +168,22 @@ private: TicketHolder* _holder; }; +// Mocks an operation submitting for ticket admission. +struct MockAdmission { + MockAdmission(std::string name, + ServiceContext* serviceContext, + AdmissionContext::Priority priority) { + client = serviceContext->makeClient(name); + opCtx = client->makeOperationContext(); + admCtx.setPriority(priority); + } + + AdmissionContext admCtx; + ServiceContext::UniqueClient client; + ServiceContext::UniqueOperationContext opCtx; + boost::optional<Ticket> ticket; +}; + template <class H> void resizeTest(OperationContext* opCtx) { // Verify that resize operations don't alter metrics outside of those linked to the number of @@ -209,7 +225,6 @@ void resizeTest(OperationContext* opCtx) { ASSERT_EQ(stats["out"], 0); ASSERT_EQ(stats["available"], 10); ASSERT_EQ(stats["totalTickets"], 10); - ASSERT_EQ(stats["totalTimeProcessingMicros"], 200); holder->resize(1); newStats = stats.getNonTicketStats(); @@ -237,65 +252,72 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) { { // Allocate the only available ticket. Priority is irrelevant when there are tickets // available. - AdmissionContext admCtx; - admCtx.setPriority(AdmissionContext::Priority::kLow); - boost::optional<Ticket> ticket = - holder.waitForTicket(_opCtx.get(), &admCtx, TicketHolder::WaitMode::kInterruptible); - ASSERT(ticket); - - // Create a client corresponding to a low priority operation that will queue. - boost::optional<Ticket> ticketLowPriority; - auto clientLowPriority = this->getServiceContext()->makeClient("clientLowPriority"); - auto opCtxLowPriority = clientLowPriority->makeOperationContext(); - // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must - // survive the lifetime of the ticket. - AdmissionContext admCtxLowPriority; - admCtxLowPriority.setPriority(AdmissionContext::Priority::kLow); - + MockAdmission initialAdmission( + "initialAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow); + initialAdmission.ticket = holder.waitForTicket(initialAdmission.opCtx.get(), + &initialAdmission.admCtx, + TicketHolder::WaitMode::kInterruptible); + ASSERT(initialAdmission.ticket); + + MockAdmission lowPriorityAdmission( + "lowPriority", this->getServiceContext(), AdmissionContext::Priority::kLow); stdx::thread lowPriorityThread([&]() { - ticketLowPriority = holder.waitForTicket(opCtxLowPriority.get(), - &admCtxLowPriority, - TicketHolder::WaitMode::kUninterruptible); + lowPriorityAdmission.ticket = + holder.waitForTicket(lowPriorityAdmission.opCtx.get(), + &lowPriorityAdmission.admCtx, + TicketHolder::WaitMode::kUninterruptible); }); - // Create a client corresponding to a normal priority operation that will queue. - boost::optional<Ticket> ticketNormalPriority; - auto clientNormalPriority = this->getServiceContext()->makeClient("clientNormalPriority"); - auto opCtxNormalPriority = clientNormalPriority->makeOperationContext(); - // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must - // survive the lifetime of the ticket. - AdmissionContext admCtxNormalPriority; - admCtxNormalPriority.setPriority(AdmissionContext::Priority::kNormal); - + MockAdmission normalPriorityAdmission( + "normalPriority", this->getServiceContext(), AdmissionContext::Priority::kNormal); stdx::thread normalPriorityThread([&]() { - ticketNormalPriority = holder.waitForTicket(opCtxNormalPriority.get(), - &admCtxNormalPriority, - TicketHolder::WaitMode::kUninterruptible); + normalPriorityAdmission.ticket = + holder.waitForTicket(normalPriorityAdmission.opCtx.get(), + &normalPriorityAdmission.admCtx, + TicketHolder::WaitMode::kUninterruptible); }); // Wait for the threads to to queue for a ticket. while (holder.queued() < 2) { } - ASSERT_EQ(stats["queueLength"], 2); - ticket.reset(); + initialAdmission.ticket.reset(); // Normal priority thread takes the ticket. normalPriorityThread.join(); - ASSERT_TRUE(ticketNormalPriority); - ASSERT_EQ(stats["removedFromQueue"], 1); - ticketNormalPriority.reset(); + ASSERT_TRUE(normalPriorityAdmission.ticket); + normalPriorityAdmission.ticket.reset(); // Low priority thread takes the ticket. lowPriorityThread.join(); - ASSERT_TRUE(ticketLowPriority); - ASSERT_EQ(stats["removedFromQueue"], 2); - ticketLowPriority.reset(); + ASSERT_TRUE(lowPriorityAdmission.ticket); + lowPriorityAdmission.ticket.reset(); } - ASSERT_EQ(stats["addedToQueue"], 2); - ASSERT_EQ(stats["removedFromQueue"], 2); - ASSERT_EQ(stats["queueLength"], 0); + ASSERT_EQ(stats["out"], 0); + ASSERT_EQ(stats["available"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + + auto currentStats = stats.getStats(); + auto lowPriorityStats = currentStats.getObjectField("lowPriority"); + ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 2); + ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 2); + ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 2); + ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0); + + auto normalPriorityStats = currentStats.getObjectField("normalPriority"); + ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("removedFromQueue"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0); } TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { @@ -307,67 +329,51 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { { // Allocate the only available ticket. Priority is irrelevant when there are tickets // available. - AdmissionContext admCtx; - admCtx.setPriority(AdmissionContext::Priority::kLow); - boost::optional<Ticket> ticket = - holder.waitForTicket(_opCtx.get(), &admCtx, TicketHolder::WaitMode::kInterruptible); - ASSERT(ticket); - - // Create a client corresponding to a low priority operation that will queue. - boost::optional<Ticket> ticketLowPriority; - auto clientLowPriority = this->getServiceContext()->makeClient("clientLowPriority"); - auto opCtxLowPriority = clientLowPriority->makeOperationContext(); - // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must - // survive the lifetime of the ticket. - AdmissionContext admCtxLowPriority; - admCtxLowPriority.setPriority(AdmissionContext::Priority::kLow); - + MockAdmission initialAdmission( + "initialAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow); + initialAdmission.ticket = holder.waitForTicket(initialAdmission.opCtx.get(), + &initialAdmission.admCtx, + TicketHolder::WaitMode::kInterruptible); + ASSERT(initialAdmission.ticket); + + MockAdmission lowPriorityAdmission( + "lowPriority", this->getServiceContext(), AdmissionContext::Priority::kLow); stdx::thread lowPriorityThread([&]() { - ticketLowPriority = holder.waitForTicket(opCtxLowPriority.get(), - &admCtxLowPriority, - TicketHolder::WaitMode::kUninterruptible); + lowPriorityAdmission.ticket = + holder.waitForTicket(lowPriorityAdmission.opCtx.get(), + &lowPriorityAdmission.admCtx, + TicketHolder::WaitMode::kUninterruptible); }); - // Create a client corresponding to a normal priority operation that will queue. - boost::optional<Ticket> ticketNormal1Priority; - auto clientNormal1Priority = this->getServiceContext()->makeClient("clientNormal1Priority"); - auto opCtxNormal1Priority = clientNormal1Priority->makeOperationContext(); - // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must - // survive the lifetime of the ticket. - AdmissionContext admCtxNormal1Priority; - admCtxNormal1Priority.setPriority(AdmissionContext::Priority::kNormal); + MockAdmission normal1PriorityAdmission( + "normal1Priority", this->getServiceContext(), AdmissionContext::Priority::kNormal); stdx::thread normal1PriorityThread([&]() { - ticketNormal1Priority = holder.waitForTicket(opCtxNormal1Priority.get(), - &admCtxNormal1Priority, - TicketHolder::WaitMode::kUninterruptible); + normal1PriorityAdmission.ticket = + holder.waitForTicket(normal1PriorityAdmission.opCtx.get(), + &normal1PriorityAdmission.admCtx, + TicketHolder::WaitMode::kUninterruptible); }); + // Wait for threads on the queue while (holder.queued() < 2) { } // Release the ticket. - ticket.reset(); + initialAdmission.ticket.reset(); // Normal priority thread takes the ticket normal1PriorityThread.join(); - ASSERT_TRUE(ticketNormal1Priority); - ASSERT_EQ(stats["removedFromQueue"], 1); - - // Create a client corresponding to a second normal priority operation that will be - // prioritized over the queued low priority operation. - boost::optional<Ticket> ticketNormal2Priority; - auto clientNormal2Priority = this->getServiceContext()->makeClient("clientNormal2Priority"); - auto opCtxNormal2Priority = clientNormal2Priority->makeOperationContext(); - // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must - // survive the lifetime of the ticket. - AdmissionContext admCtxNormal2Priority; - admCtxNormal2Priority.setPriority(AdmissionContext::Priority::kNormal); + ASSERT_TRUE(normal1PriorityAdmission.ticket); + + MockAdmission normal2PriorityAdmission( + "normal2Priority", this->getServiceContext(), AdmissionContext::Priority::kNormal); stdx::thread normal2PriorityThread([&]() { - ticketNormal2Priority = holder.waitForTicket(opCtxNormal2Priority.get(), - &admCtxNormal2Priority, - TicketHolder::WaitMode::kUninterruptible); + normal2PriorityAdmission.ticket = + holder.waitForTicket(normal2PriorityAdmission.opCtx.get(), + &normal2PriorityAdmission.admCtx, + TicketHolder::WaitMode::kUninterruptible); }); // Wait for the new thread on the queue. @@ -375,23 +381,213 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { } // Release the ticket. - ticketNormal1Priority.reset(); + normal1PriorityAdmission.ticket.reset(); // The other normal priority thread takes the ticket. normal2PriorityThread.join(); - ASSERT_TRUE(ticketNormal2Priority); - ASSERT_EQ(stats["removedFromQueue"], 2); - ticketNormal2Priority.reset(); + ASSERT_TRUE(normal2PriorityAdmission.ticket); + normal2PriorityAdmission.ticket.reset(); // Low priority thread takes the ticket. lowPriorityThread.join(); - ASSERT_TRUE(ticketLowPriority); - ASSERT_EQ(stats["removedFromQueue"], 3); - ticketLowPriority.reset(); + ASSERT_TRUE(lowPriorityAdmission.ticket); + lowPriorityAdmission.ticket.reset(); + } + + ASSERT_EQ(stats["out"], 0); + ASSERT_EQ(stats["available"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + + auto currentStats = stats.getStats(); + auto lowPriorityStats = currentStats.getObjectField("lowPriority"); + ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 2); + ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 2); + + auto normalPriorityStats = currentStats.getObjectField("normalPriority"); + ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), 2); + ASSERT_EQ(normalPriorityStats.getIntField("removedFromQueue"), 2); + ASSERT_EQ(normalPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 2); + ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 2); +} + +TEST_F(TicketHolderTest, PriorityBasicMetrics) { + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); + PriorityTicketHolder holder(1, &serviceContext); + Stats stats(&holder); + + MockAdmission lowPriorityAdmission( + "lowPriorityAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow); + lowPriorityAdmission.ticket = holder.waitForTicket(lowPriorityAdmission.opCtx.get(), + &lowPriorityAdmission.admCtx, + TicketHolder::WaitMode::kInterruptible); + + unittest::Barrier barrierAcquiredTicket(2); + unittest::Barrier barrierReleaseTicket(2); + + stdx::thread waiting([&]() { + // The ticket assigned to this admission is tied to the scope of the thread. Once the thread + // joins, the ticket is released back to the TicketHolder. + MockAdmission normalPriorityAdmission( + "normalPriority", this->getServiceContext(), AdmissionContext::Priority::kNormal); + + normalPriorityAdmission.ticket = + holder.waitForTicket(normalPriorityAdmission.opCtx.get(), + &normalPriorityAdmission.admCtx, + TicketHolder::WaitMode::kUninterruptible); + barrierAcquiredTicket.countDownAndWait(); + barrierReleaseTicket.countDownAndWait(); + }); + + while (holder.queued() == 0) { + // Wait for thread to start waiting. + } + + { + // Test that the metrics eventually converge to the following set of values. There can be + // cases where the values are incorrect for brief periods of time due to optimistic + // concurrency. + auto deadline = Date_t::now() + Milliseconds{100}; + while (true) { + try { + // ASSERT_EQ(stats["out"], 1); + ASSERT_EQ(stats["available"], 0); + // ASSERT_EQ(stats["addedToQueue"], 1); + // ASSERT_EQ(stats["queueLength"], 1); + break; + } catch (...) { + if (Date_t::now() > deadline) { + throw; + } + // Sleep to allow other threads to process and converge the metrics. + stdx::this_thread::sleep_for(Milliseconds{1}.toSystemDuration()); + } + } + } + tickSource->advance(Microseconds(100)); + lowPriorityAdmission.ticket.reset(); + + while (holder.queued() > 0) { + // Wait for thread to take ticket. } - ASSERT_EQ(stats["addedToQueue"], 3); - ASSERT_EQ(stats["removedFromQueue"], 3); - ASSERT_EQ(stats["queueLength"], 0); + + barrierAcquiredTicket.countDownAndWait(); + tickSource->advance(Microseconds(200)); + barrierReleaseTicket.countDownAndWait(); + + waiting.join(); + ASSERT_EQ(lowPriorityAdmission.admCtx.getAdmissions(), 1); + + ASSERT_EQ(stats["out"], 0); + ASSERT_EQ(stats["available"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + + auto currentStats = stats.getStats(); + auto lowPriorityStats = currentStats.getObjectField("lowPriority"); + ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("totalTimeProcessingMicros"), 100); + ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0); + + auto normalPriorityStats = currentStats.getObjectField("normalPriority"); + ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("removedFromQueue"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("totalTimeProcessingMicros"), 200); + ASSERT_EQ(normalPriorityStats.getIntField("totalTimeQueuedMicros"), 100); + ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0); + + // Retake ticket. + holder.waitForTicket( + _opCtx.get(), &lowPriorityAdmission.admCtx, TicketHolder::WaitMode::kInterruptible); + + ASSERT_EQ(lowPriorityAdmission.admCtx.getAdmissions(), 2); + + currentStats = stats.getStats(); + lowPriorityStats = currentStats.getObjectField("lowPriority"); + ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1); + + normalPriorityStats = currentStats.getObjectField("normalPriority"); + ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1); } +TEST_F(TicketHolderTest, PriorityCanceled) { + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); + PriorityTicketHolder holder(1, &serviceContext); + Stats stats(&holder); + { + MockAdmission lowPriorityAdmission( + "lowPriorityAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow); + lowPriorityAdmission.ticket = holder.waitForTicket(lowPriorityAdmission.opCtx.get(), + &lowPriorityAdmission.admCtx, + TicketHolder::WaitMode::kInterruptible); + stdx::thread waiting([&]() { + MockAdmission normalPriorityAdmission( + "normalPriority", this->getServiceContext(), AdmissionContext::Priority::kNormal); + + auto deadline = Date_t::now() + Milliseconds(100); + normalPriorityAdmission.ticket = + holder.waitForTicketUntil(normalPriorityAdmission.opCtx.get(), + &normalPriorityAdmission.admCtx, + deadline, + TicketHolder::WaitMode::kInterruptible); + ASSERT_FALSE(normalPriorityAdmission.ticket); + }); + + while (holder.queued() == 0) { + // Wait for thread to take ticket. + } + + tickSource->advance(Microseconds(100)); + waiting.join(); + } + + ASSERT_EQ(stats["out"], 0); + ASSERT_EQ(stats["available"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + + auto currentStats = stats.getStats(); + auto lowPriorityStats = currentStats.getObjectField("lowPriority"); + ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("totalTimeProcessingMicros"), 100); + ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0); + + auto normalPriorityStats = currentStats.getObjectField("normalPriority"); + ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("removedFromQueue"), 1); + ASSERT_EQ(normalPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("totalTimeQueuedMicros"), 100); + ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 0); + ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 1); +} } // namespace |