diff options
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder_test.cpp')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_test.cpp | 169 |
1 files changed, 152 insertions, 17 deletions
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp index 779b0777991..7651a66b7f2 100644 --- a/src/mongo/util/concurrency/ticketholder_test.cpp +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -62,6 +62,27 @@ protected: ServiceContext::UniqueOperationContext _opCtx; }; +static inline const Seconds kWaitTimeout{2}; +static inline const Milliseconds kSleepTime{1}; + +/** + * Asserts that eventually the predicate does not throw an exception. + */ +void assertSoon(std::function<void()> predicate, Milliseconds timeout = kWaitTimeout) { + Timer t; + while (true) { + try { + predicate(); + break; + } catch (...) { + if (t.elapsed() >= timeout) { + throw; + } + sleepFor(kSleepTime); + } + } +} + template <class H> void basicTimeout(OperationContext* opCtx) { ServiceContext serviceContext; @@ -185,7 +206,7 @@ struct MockAdmission { }; template <class H> -void resizeTest(OperationContext* opCtx) { +void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperation = false) { // Verify that resize operations don't alter metrics outside of those linked to the number of // tickets. ServiceContext serviceContext; @@ -195,6 +216,16 @@ void resizeTest(OperationContext* opCtx) { std::unique_ptr<TicketHolderWithQueueingStats> holder = std::make_unique<H>(1, &serviceContext); Stats stats(holder.get()); + // An outstanding kImmediate priority operation should not impact resize statistics. + MockAdmission immediatePriorityAdmission("immediatePriorityAdmission", + getGlobalServiceContext(), + AdmissionContext::Priority::kImmediate); + if (testWithOutstandingImmediateOperation) { + immediatePriorityAdmission.ticket = + holder->acquireImmediateTicket(&immediatePriorityAdmission.admCtx); + ASSERT(immediatePriorityAdmission.ticket); + } + AdmissionContext admCtx; admCtx.setPriority(AdmissionContext::Priority::kNormal); @@ -242,6 +273,12 @@ TEST_F(TicketHolderTest, ResizeStatsSemaphore) { TEST_F(TicketHolderTest, ResizeStatsPriority) { resizeTest<PriorityTicketHolder>(_opCtx.get()); } +TEST_F(TicketHolderTest, ResizeStatsSemaphoreWithOutstandingImmediatePriority) { + resizeTest<SemaphoreTicketHolder>(_opCtx.get(), true); +} +TEST_F(TicketHolderTest, ResizeStatsPriorityWithOutstandingImmediatePriority) { + resizeTest<PriorityTicketHolder>(_opCtx.get(), true); +} TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) { ServiceContext serviceContext; @@ -318,6 +355,13 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) { ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 1); ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1); ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0); + + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0); } TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { @@ -414,6 +458,13 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 2); ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0); ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 2); + + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0); } TEST_F(TicketHolderTest, PriorityBasicMetrics) { @@ -454,23 +505,9 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) { // Test that the metrics eventually converge to the following set of values. There can be // cases where the values are incorrect for brief periods of time due to optimistic // concurrency. - auto deadline = Date_t::now() + Milliseconds{100}; - while (true) { - try { - // ASSERT_EQ(stats["out"], 1); - ASSERT_EQ(stats["available"], 0); - // ASSERT_EQ(stats["addedToQueue"], 1); - // ASSERT_EQ(stats["queueLength"], 1); - break; - } catch (...) { - if (Date_t::now() > deadline) { - throw; - } - // Sleep to allow other threads to process and converge the metrics. - stdx::this_thread::sleep_for(Milliseconds{1}.toSystemDuration()); - } - } + assertSoon([&] { ASSERT_EQ(stats["available"], 0); }); } + tickSource->advance(Microseconds(100)); lowPriorityAdmission.ticket.reset(); @@ -514,6 +551,13 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) { ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1); ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0); + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0); + // Retake ticket. holder.waitForTicket( _opCtx.get(), &lowPriorityAdmission.admCtx, TicketHolder::WaitMode::kInterruptible); @@ -528,6 +572,90 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) { ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1); } +TEST_F(TicketHolderTest, PrioritImmediateMetrics) { + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); + PriorityTicketHolder holder(1, &serviceContext); + Stats stats(&holder); + + MockAdmission lowPriorityAdmission( + "lowPriorityAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow); + lowPriorityAdmission.ticket = holder.waitForTicket(lowPriorityAdmission.opCtx.get(), + &lowPriorityAdmission.admCtx, + TicketHolder::WaitMode::kInterruptible); + ASSERT(lowPriorityAdmission.ticket); + { + // Test that the metrics eventually converge to the following set of values. There can be + // cases where the values are incorrect for brief periods of time due to optimistic + // concurrency. + assertSoon([&] { + ASSERT_EQ(stats["available"], 0); + ASSERT_EQ(stats["out"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + }); + } + + MockAdmission immediatePriorityAdmission("immediatePriorityAdmission", + this->getServiceContext(), + AdmissionContext::Priority::kImmediate); + immediatePriorityAdmission.ticket = + holder.acquireImmediateTicket(&immediatePriorityAdmission.admCtx); + ASSERT(immediatePriorityAdmission.ticket); + + { + // Test that the metrics eventually converge to the following set of values. There can be + // cases where the values are incorrect for brief periods of time due to optimistic + // concurrency. + assertSoon([&]() { + // only reported in the priority specific statistics. + ASSERT_EQ(stats["available"], 0); + ASSERT_EQ(stats["out"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + + auto currentStats = stats.getStats(); + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 1); + + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 1); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + }); + } + + lowPriorityAdmission.ticket.reset(); + + tickSource->advance(Microseconds(200)); + + assertSoon([&] { + ASSERT_EQ(stats["out"], 0); + ASSERT_EQ(stats["available"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + }); + + immediatePriorityAdmission.ticket.reset(); + + auto currentStats = stats.getStats(); + auto lowPriorityStats = currentStats.getObjectField("lowPriority"); + ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0); + + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 1); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 200); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 1); +} + TEST_F(TicketHolderTest, PriorityCanceled) { ServiceContext serviceContext; serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); @@ -589,5 +717,12 @@ TEST_F(TicketHolderTest, PriorityCanceled) { ASSERT_EQ(normalPriorityStats.getIntField("totalTimeQueuedMicros"), 100); ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 0); ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 1); + + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0); } } // namespace |