summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/ticketholder_test.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder_test.cpp')
-rw-r--r--src/mongo/util/concurrency/ticketholder_test.cpp169
1 files changed, 152 insertions, 17 deletions
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp
index 779b0777991..7651a66b7f2 100644
--- a/src/mongo/util/concurrency/ticketholder_test.cpp
+++ b/src/mongo/util/concurrency/ticketholder_test.cpp
@@ -62,6 +62,27 @@ protected:
ServiceContext::UniqueOperationContext _opCtx;
};
+static inline const Seconds kWaitTimeout{2};
+static inline const Milliseconds kSleepTime{1};
+
+/**
+ * Asserts that eventually the predicate does not throw an exception.
+ */
+void assertSoon(std::function<void()> predicate, Milliseconds timeout = kWaitTimeout) {
+ Timer t;
+ while (true) {
+ try {
+ predicate();
+ break;
+ } catch (...) {
+ if (t.elapsed() >= timeout) {
+ throw;
+ }
+ sleepFor(kSleepTime);
+ }
+ }
+}
+
template <class H>
void basicTimeout(OperationContext* opCtx) {
ServiceContext serviceContext;
@@ -185,7 +206,7 @@ struct MockAdmission {
};
template <class H>
-void resizeTest(OperationContext* opCtx) {
+void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperation = false) {
// Verify that resize operations don't alter metrics outside of those linked to the number of
// tickets.
ServiceContext serviceContext;
@@ -195,6 +216,16 @@ void resizeTest(OperationContext* opCtx) {
std::unique_ptr<TicketHolderWithQueueingStats> holder = std::make_unique<H>(1, &serviceContext);
Stats stats(holder.get());
+ // An outstanding kImmediate priority operation should not impact resize statistics.
+ MockAdmission immediatePriorityAdmission("immediatePriorityAdmission",
+ getGlobalServiceContext(),
+ AdmissionContext::Priority::kImmediate);
+ if (testWithOutstandingImmediateOperation) {
+ immediatePriorityAdmission.ticket =
+ holder->acquireImmediateTicket(&immediatePriorityAdmission.admCtx);
+ ASSERT(immediatePriorityAdmission.ticket);
+ }
+
AdmissionContext admCtx;
admCtx.setPriority(AdmissionContext::Priority::kNormal);
@@ -242,6 +273,12 @@ TEST_F(TicketHolderTest, ResizeStatsSemaphore) {
TEST_F(TicketHolderTest, ResizeStatsPriority) {
resizeTest<PriorityTicketHolder>(_opCtx.get());
}
+TEST_F(TicketHolderTest, ResizeStatsSemaphoreWithOutstandingImmediatePriority) {
+ resizeTest<SemaphoreTicketHolder>(_opCtx.get(), true);
+}
+TEST_F(TicketHolderTest, ResizeStatsPriorityWithOutstandingImmediatePriority) {
+ resizeTest<PriorityTicketHolder>(_opCtx.get(), true);
+}
TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) {
ServiceContext serviceContext;
@@ -318,6 +355,13 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) {
ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 1);
ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1);
ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0);
+
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0);
}
TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
@@ -414,6 +458,13 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 2);
ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0);
ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 2);
+
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0);
}
TEST_F(TicketHolderTest, PriorityBasicMetrics) {
@@ -454,23 +505,9 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) {
// Test that the metrics eventually converge to the following set of values. There can be
// cases where the values are incorrect for brief periods of time due to optimistic
// concurrency.
- auto deadline = Date_t::now() + Milliseconds{100};
- while (true) {
- try {
- // ASSERT_EQ(stats["out"], 1);
- ASSERT_EQ(stats["available"], 0);
- // ASSERT_EQ(stats["addedToQueue"], 1);
- // ASSERT_EQ(stats["queueLength"], 1);
- break;
- } catch (...) {
- if (Date_t::now() > deadline) {
- throw;
- }
- // Sleep to allow other threads to process and converge the metrics.
- stdx::this_thread::sleep_for(Milliseconds{1}.toSystemDuration());
- }
- }
+ assertSoon([&] { ASSERT_EQ(stats["available"], 0); });
}
+
tickSource->advance(Microseconds(100));
lowPriorityAdmission.ticket.reset();
@@ -514,6 +551,13 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) {
ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1);
ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0);
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0);
+
// Retake ticket.
holder.waitForTicket(
_opCtx.get(), &lowPriorityAdmission.admCtx, TicketHolder::WaitMode::kInterruptible);
@@ -528,6 +572,90 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) {
ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1);
}
+TEST_F(TicketHolderTest, PrioritImmediateMetrics) {
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
+ PriorityTicketHolder holder(1, &serviceContext);
+ Stats stats(&holder);
+
+ MockAdmission lowPriorityAdmission(
+ "lowPriorityAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow);
+ lowPriorityAdmission.ticket = holder.waitForTicket(lowPriorityAdmission.opCtx.get(),
+ &lowPriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kInterruptible);
+ ASSERT(lowPriorityAdmission.ticket);
+ {
+ // Test that the metrics eventually converge to the following set of values. There can be
+ // cases where the values are incorrect for brief periods of time due to optimistic
+ // concurrency.
+ assertSoon([&] {
+ ASSERT_EQ(stats["available"], 0);
+ ASSERT_EQ(stats["out"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+ });
+ }
+
+ MockAdmission immediatePriorityAdmission("immediatePriorityAdmission",
+ this->getServiceContext(),
+ AdmissionContext::Priority::kImmediate);
+ immediatePriorityAdmission.ticket =
+ holder.acquireImmediateTicket(&immediatePriorityAdmission.admCtx);
+ ASSERT(immediatePriorityAdmission.ticket);
+
+ {
+ // Test that the metrics eventually converge to the following set of values. There can be
+ // cases where the values are incorrect for brief periods of time due to optimistic
+ // concurrency.
+ assertSoon([&]() {
+ // only reported in the priority specific statistics.
+ ASSERT_EQ(stats["available"], 0);
+ ASSERT_EQ(stats["out"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+
+ auto currentStats = stats.getStats();
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 1);
+
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 1);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ });
+ }
+
+ lowPriorityAdmission.ticket.reset();
+
+ tickSource->advance(Microseconds(200));
+
+ assertSoon([&] {
+ ASSERT_EQ(stats["out"], 0);
+ ASSERT_EQ(stats["available"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+ });
+
+ immediatePriorityAdmission.ticket.reset();
+
+ auto currentStats = stats.getStats();
+ auto lowPriorityStats = currentStats.getObjectField("lowPriority");
+ ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0);
+
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 1);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 200);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 1);
+}
+
TEST_F(TicketHolderTest, PriorityCanceled) {
ServiceContext serviceContext;
serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
@@ -589,5 +717,12 @@ TEST_F(TicketHolderTest, PriorityCanceled) {
ASSERT_EQ(normalPriorityStats.getIntField("totalTimeQueuedMicros"), 100);
ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 0);
ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 1);
+
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0);
}
} // namespace