summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp128
-rw-r--r--src/mongo/util/concurrency/ticketholder.h83
-rw-r--r--src/mongo/util/concurrency/ticketholder_test.cpp390
3 files changed, 437 insertions, 164 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index 8a098ce561b..b5663a65b54 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -146,28 +146,16 @@ void TicketHolderWithQueueingStats::appendStats(BSONObjBuilder& b) const {
b.append("out", used());
b.append("available", available());
b.append("totalTickets", outof());
- auto removed = _totalRemovedQueue.loadRelaxed();
- auto added = _totalAddedQueue.loadRelaxed();
- b.append("addedToQueue", added);
- b.append("removedFromQueue", removed);
- b.append("queueLength", std::max(static_cast<int>(added - removed), 0));
- auto finished = _totalFinishedProcessing.loadRelaxed();
- auto started = _totalStartedProcessing.loadRelaxed();
- b.append("startedProcessing", started);
- b.append("processing", std::max(static_cast<int>(started - finished), 0));
- b.append("finishedProcessing", finished);
- b.append("totalTimeProcessingMicros", _totalTimeProcessingMicros.loadRelaxed());
- b.append("canceled", _totalCanceled.loadRelaxed());
- b.append("newAdmissions", _totalNewAdmissions.loadRelaxed());
_appendImplStats(b);
}
void TicketHolderWithQueueingStats::_release(AdmissionContext* admCtx) noexcept {
- _totalFinishedProcessing.fetchAndAddRelaxed(1);
+ auto& queueStats = _getQueueStatsToUse(admCtx);
+ queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1);
auto startTime = admCtx->getStartProcessingTime();
auto tickSource = _serviceContext->getTickSource();
auto delta = tickSource->spanTo<Microseconds>(startTime, tickSource->getTicks());
- _totalTimeProcessingMicros.fetchAndAddRelaxed(delta.count());
+ queueStats.totalTimeProcessingMicros.fetchAndAddRelaxed(delta.count());
_releaseQueue(admCtx);
}
@@ -185,11 +173,12 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::tryAcquire(AdmissionConte
auto ticket = _tryAcquireImpl(admCtx);
// Track statistics.
if (ticket) {
+ auto& queueStats = _getQueueStatsToUse(admCtx);
if (admCtx->getAdmissions() == 0) {
- _totalNewAdmissions.fetchAndAddRelaxed(1);
+ queueStats.totalNewAdmissions.fetchAndAddRelaxed(1);
}
admCtx->start(_serviceContext->getTickSource());
- _totalStartedProcessing.fetchAndAddRelaxed(1);
+ queueStats.totalStartedProcessing.fetchAndAddRelaxed(1);
}
return ticket;
}
@@ -206,15 +195,23 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat
return ticket;
}
- // Track statistics
-
- _totalAddedQueue.fetchAndAddRelaxed(1);
- ON_BLOCK_EXIT([&] { _totalRemovedQueue.fetchAndAddRelaxed(1); });
+ auto& queueStats = _getQueueStatsToUse(admCtx);
+ auto tickSource = _serviceContext->getTickSource();
+ auto currentWaitTime = tickSource->getTicks();
+ auto updateQueuedTime = [&]() {
+ auto oldWaitTime = std::exchange(currentWaitTime, tickSource->getTicks());
+ auto waitDelta = tickSource->spanTo<Microseconds>(oldWaitTime, currentWaitTime).count();
+ queueStats.totalTimeQueuedMicros.fetchAndAddRelaxed(waitDelta);
+ };
+ queueStats.totalAddedQueue.fetchAndAddRelaxed(1);
+ ON_BLOCK_EXIT([&] {
+ updateQueuedTime();
+ queueStats.totalRemovedQueue.fetchAndAddRelaxed(1);
+ });
- // Enqueue.
ScopeGuard cancelWait([&] {
// Update statistics.
- _totalCanceled.fetchAndAddRelaxed(1);
+ queueStats.totalCanceled.fetchAndAddRelaxed(1);
});
auto ticket = _waitForTicketUntilImpl(opCtx, admCtx, until, waitMode);
@@ -222,17 +219,32 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat
if (ticket) {
cancelWait.dismiss();
if (admCtx->getAdmissions() == 0) {
- _totalNewAdmissions.fetchAndAddRelaxed(1);
+ queueStats.totalNewAdmissions.fetchAndAddRelaxed(1);
}
admCtx->start(_serviceContext->getTickSource());
- _totalStartedProcessing.fetchAndAddRelaxed(1);
+ queueStats.totalStartedProcessing.fetchAndAddRelaxed(1);
return ticket;
} else {
return boost::none;
}
}
-
+void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
+ auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed();
+ auto added = _semaphoreStats.totalAddedQueue.loadRelaxed();
+ b.append("addedToQueue", added);
+ b.append("removedFromQueue", removed);
+ b.append("queueLength", std::max(static_cast<int>(added - removed), 0));
+ auto finished = _semaphoreStats.totalFinishedProcessing.loadRelaxed();
+ auto started = _semaphoreStats.totalStartedProcessing.loadRelaxed();
+ b.append("startedProcessing", started);
+ b.append("processing", std::max(static_cast<int>(started - finished), 0));
+ b.append("finishedProcessing", finished);
+ b.append("totalTimeProcessingMicros", _semaphoreStats.totalTimeProcessingMicros.loadRelaxed());
+ b.append("canceled", _semaphoreStats.totalCanceled.loadRelaxed());
+ b.append("newAdmissions", _semaphoreStats.totalNewAdmissions.loadRelaxed());
+ b.append("totalTimeQueuedMicros", _semaphoreStats.totalTimeQueuedMicros.loadRelaxed());
+}
#if defined(__linux__)
namespace {
@@ -287,18 +299,6 @@ boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(Operation
AdmissionContext* admCtx,
Date_t until,
WaitMode waitMode) {
-
- auto tickSource = _serviceContext->getTickSource();
- // Track statistics
- auto currentWaitTime = tickSource->getTicks();
- auto updateQueuedTime = [&]() {
- auto oldWaitTime = std::exchange(currentWaitTime, tickSource->getTicks());
- auto waitDelta = tickSource->spanTo<Microseconds>(oldWaitTime, currentWaitTime).count();
- _totalTimeQueuedMicros.fetchAndAddRelaxed(waitDelta);
- };
-
- ON_BLOCK_EXIT(updateQueuedTime);
-
const Milliseconds intervalMs(500);
struct timespec ts;
@@ -325,8 +325,6 @@ boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(Operation
// It is possible to unset 'errno' after a call to checkForInterrupt().
if (waitMode == WaitMode::kInterruptible)
opCtx->checkForInterrupt();
-
- updateQueuedTime();
}
return Ticket{this, admCtx};
}
@@ -435,10 +433,6 @@ void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept {
}
#endif
-void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
- b.append("totalTimeQueuedMicros", _totalTimeQueuedMicros.loadRelaxed());
-}
-
SchedulingTicketHolder::SchedulingTicketHolder(int numTickets,
unsigned int numQueues,
ServiceContext* serviceContext)
@@ -463,7 +457,6 @@ int SchedulingTicketHolder::queued() const {
void SchedulingTicketHolder::_releaseQueue(AdmissionContext* admCtx) noexcept {
invariant(admCtx);
-
// The idea behind the release mechanism consists of a consistent view of queued elements
// waiting for a ticket and many threads releasing tickets simultaneously. The releasers will
// proceed to attempt to dequeue an element by seeing if there are threads not woken and waking
@@ -509,7 +502,7 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio
WaitMode waitMode) {
invariant(admCtx);
- auto& queue = _getQueueToUse(opCtx, admCtx);
+ auto& queue = _getQueueToUse(admCtx);
bool assigned;
{
@@ -617,8 +610,44 @@ void PriorityTicketHolder::_dequeueWaitingThread() {
}
}
+void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
+ {
+ BSONObjBuilder bbb(b.subobjStart("lowPriority"));
+ auto& queueStats =
+ _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)].getStats();
+ _appendPriorityStats(bbb, queueStats);
+ bbb.done();
+ }
+ {
+ BSONObjBuilder bbb(b.subobjStart("normalPriority"));
+ auto& queueStats =
+ _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)].getStats();
+ _appendPriorityStats(bbb, queueStats);
+ bbb.done();
+ }
+}
+
+void PriorityTicketHolder::_appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const {
+ auto removed = stats.totalRemovedQueue.loadRelaxed();
+ auto added = stats.totalAddedQueue.loadRelaxed();
+
+ b.append("addedToQueue", added);
+ b.append("removedFromQueue", removed);
+ b.append("queueLength", std::max(static_cast<int>(added - removed), 0));
+
+ auto finished = stats.totalFinishedProcessing.loadRelaxed();
+ auto started = stats.totalStartedProcessing.loadRelaxed();
+ b.append("startedProcessing", started);
+ b.append("processing", std::max(static_cast<int>(started - finished), 0));
+ b.append("finishedProcessing", finished);
+ b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed());
+ b.append("canceled", stats.totalCanceled.loadRelaxed());
+ b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed());
+ b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed());
+}
+
SchedulingTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse(
- OperationContext* opCtx, const AdmissionContext* admCtx) {
+ const AdmissionContext* admCtx) noexcept {
auto priority = admCtx->getPriority();
switch (priority) {
case AdmissionContext::Priority::kLow:
@@ -629,4 +658,9 @@ SchedulingTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse(
MONGO_UNREACHABLE;
}
}
+
+TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse(
+ const AdmissionContext* admCtx) noexcept {
+ return _getQueueToUse(admCtx).getStatsToUse();
+}
} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 9181b944a56..e9846e0173f 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -101,6 +101,9 @@ private:
virtual void _release(AdmissionContext* admCtx) noexcept = 0;
};
+/**
+ * A ticketholder which manages both aggregate and policy specific queueing statistics.
+ */
class TicketHolderWithQueueingStats : public TicketHolder {
friend class ReaderWriterTicketHolder;
@@ -150,14 +153,29 @@ public:
return _outof.loadRelaxed();
}
- virtual int queued() const {
- auto removed = _totalRemovedQueue.loadRelaxed();
- auto added = _totalAddedQueue.loadRelaxed();
- return std::max(static_cast<int>(added - removed), 0);
- }
+ /**
+ * Returns the total number of operations queued - regardles of queueing policy.
+ */
+ virtual int queued() const = 0;
void appendStats(BSONObjBuilder& b) const override;
+ /**
+ * Statistics for queueing mechanisms in the TicketHolder implementations. The term "Queue" is a
+ * loose abstraction for the way in which operations are queued when there are no available
+ * tickets.
+ */
+ struct QueueStats {
+ AtomicWord<std::int64_t> totalAddedQueue{0};
+ AtomicWord<std::int64_t> totalRemovedQueue{0};
+ AtomicWord<std::int64_t> totalFinishedProcessing{0};
+ AtomicWord<std::int64_t> totalNewAdmissions{0};
+ AtomicWord<std::int64_t> totalTimeProcessingMicros{0};
+ AtomicWord<std::int64_t> totalStartedProcessing{0};
+ AtomicWord<std::int64_t> totalCanceled{0};
+ AtomicWord<std::int64_t> totalTimeQueuedMicros{0};
+ };
+
private:
virtual boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) = 0;
@@ -174,13 +192,11 @@ private:
virtual void _resize(int newSize, int oldSize) noexcept = 0;
- AtomicWord<std::int64_t> _totalAddedQueue{0};
- AtomicWord<std::int64_t> _totalRemovedQueue{0};
- AtomicWord<std::int64_t> _totalFinishedProcessing{0};
- AtomicWord<std::int64_t> _totalNewAdmissions{0};
- AtomicWord<std::int64_t> _totalTimeProcessingMicros{0};
- AtomicWord<std::int64_t> _totalStartedProcessing{0};
- AtomicWord<std::int64_t> _totalCanceled{0};
+ /**
+ * Fetches the queueing statistics corresponding to the 'admCtx'. All statistics that are queue
+ * specific should be updated through the resulting 'QueueStats'.
+ */
+ virtual QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept = 0;
Mutex _resizeMutex = MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(2),
"TicketHolderWithQueueingStats::_resizeMutex");
@@ -237,7 +253,6 @@ public:
private:
void _release(AdmissionContext* admCtx) noexcept override final;
-private:
std::unique_ptr<TicketHolderWithQueueingStats> _reader;
std::unique_ptr<TicketHolderWithQueueingStats> _writer;
};
@@ -249,6 +264,12 @@ public:
int available() const override final;
+ int queued() const override final {
+ auto removed = _semaphoreStats.totalRemovedQueue.loadRelaxed();
+ auto added = _semaphoreStats.totalAddedQueue.loadRelaxed();
+ return std::max(static_cast<int>(added - removed), 0);
+ };
+
private:
boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
AdmissionContext* admCtx,
@@ -262,6 +283,9 @@ private:
void _resize(int newSize, int oldSize) noexcept override final;
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final {
+ return _semaphoreStats;
+ }
#if defined(__linux__)
mutable sem_t _sem;
@@ -273,9 +297,7 @@ private:
MONGO_MAKE_LATCH(HierarchicalAcquisitionLevel(0), "SemaphoreTicketHolder::_mutex");
stdx::condition_variable _newTicket;
#endif
-
- // Implementation statistics.
- AtomicWord<std::int64_t> _totalTimeQueuedMicros{0};
+ QueueStats _semaphoreStats;
};
/**
@@ -318,6 +340,19 @@ protected:
return _queuedThreads;
}
+ /**
+ * Returns a reference to the Queue statistics that allows callers to update the statistics.
+ */
+ QueueStats& getStatsToUse() {
+ return _stats;
+ }
+ /**
+ * Returns a read-only reference to the Queue statistics.
+ */
+ const QueueStats& getStats() const {
+ return _stats;
+ }
+
private:
void _signalThreadWoken();
@@ -325,6 +360,7 @@ protected:
AtomicWord<int> _threadsToBeWoken{0};
stdx::condition_variable _cv;
SchedulingTicketHolder* _holder;
+ QueueStats _stats;
};
std::vector<Queue> _queues;
@@ -351,9 +387,10 @@ private:
void _releaseQueue(AdmissionContext* admCtx) noexcept override final;
- void _appendImplStats(BSONObjBuilder& b) const override final{};
-
void _resize(int newSize, int oldSize) noexcept override final;
+
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override = 0;
+
/**
* Wakes up a waiting thread (if it exists) in order for it to attempt to obtain a ticket.
* Implementors MUST wake at least one waiting thread if at least one thread is pending to be
@@ -368,10 +405,12 @@ private:
*/
virtual void _dequeueWaitingThread() = 0;
+ void _appendImplStats(BSONObjBuilder& b) const override = 0;
+
/**
* Selects the queue to use for the current thread given the provided arguments.
*/
- virtual Queue& _getQueueToUse(OperationContext* opCtx, const AdmissionContext* admCtx) = 0;
+ virtual Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept = 0;
QueueMutex _queueMutex;
AtomicWord<int> _ticketsAvailable;
@@ -391,8 +430,12 @@ private:
};
void _dequeueWaitingThread() override final;
+ QueueStats& _getQueueStatsToUse(const AdmissionContext* admCtx) noexcept override final;
+
+ void _appendImplStats(BSONObjBuilder& b) const override final;
+ void _appendPriorityStats(BSONObjBuilder& b, const QueueStats& stats) const;
- Queue& _getQueueToUse(OperationContext* opCtx, const AdmissionContext* admCtx) override final;
+ Queue& _getQueueToUse(const AdmissionContext* admCtx) noexcept override final;
};
/**
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp
index ccbf831097d..779b0777991 100644
--- a/src/mongo/util/concurrency/ticketholder_test.cpp
+++ b/src/mongo/util/concurrency/ticketholder_test.cpp
@@ -168,6 +168,22 @@ private:
TicketHolder* _holder;
};
+// Mocks an operation submitting for ticket admission.
+struct MockAdmission {
+ MockAdmission(std::string name,
+ ServiceContext* serviceContext,
+ AdmissionContext::Priority priority) {
+ client = serviceContext->makeClient(name);
+ opCtx = client->makeOperationContext();
+ admCtx.setPriority(priority);
+ }
+
+ AdmissionContext admCtx;
+ ServiceContext::UniqueClient client;
+ ServiceContext::UniqueOperationContext opCtx;
+ boost::optional<Ticket> ticket;
+};
+
template <class H>
void resizeTest(OperationContext* opCtx) {
// Verify that resize operations don't alter metrics outside of those linked to the number of
@@ -209,7 +225,6 @@ void resizeTest(OperationContext* opCtx) {
ASSERT_EQ(stats["out"], 0);
ASSERT_EQ(stats["available"], 10);
ASSERT_EQ(stats["totalTickets"], 10);
- ASSERT_EQ(stats["totalTimeProcessingMicros"], 200);
holder->resize(1);
newStats = stats.getNonTicketStats();
@@ -237,65 +252,72 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) {
{
// Allocate the only available ticket. Priority is irrelevant when there are tickets
// available.
- AdmissionContext admCtx;
- admCtx.setPriority(AdmissionContext::Priority::kLow);
- boost::optional<Ticket> ticket =
- holder.waitForTicket(_opCtx.get(), &admCtx, TicketHolder::WaitMode::kInterruptible);
- ASSERT(ticket);
-
- // Create a client corresponding to a low priority operation that will queue.
- boost::optional<Ticket> ticketLowPriority;
- auto clientLowPriority = this->getServiceContext()->makeClient("clientLowPriority");
- auto opCtxLowPriority = clientLowPriority->makeOperationContext();
- // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must
- // survive the lifetime of the ticket.
- AdmissionContext admCtxLowPriority;
- admCtxLowPriority.setPriority(AdmissionContext::Priority::kLow);
-
+ MockAdmission initialAdmission(
+ "initialAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow);
+ initialAdmission.ticket = holder.waitForTicket(initialAdmission.opCtx.get(),
+ &initialAdmission.admCtx,
+ TicketHolder::WaitMode::kInterruptible);
+ ASSERT(initialAdmission.ticket);
+
+ MockAdmission lowPriorityAdmission(
+ "lowPriority", this->getServiceContext(), AdmissionContext::Priority::kLow);
stdx::thread lowPriorityThread([&]() {
- ticketLowPriority = holder.waitForTicket(opCtxLowPriority.get(),
- &admCtxLowPriority,
- TicketHolder::WaitMode::kUninterruptible);
+ lowPriorityAdmission.ticket =
+ holder.waitForTicket(lowPriorityAdmission.opCtx.get(),
+ &lowPriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kUninterruptible);
});
- // Create a client corresponding to a normal priority operation that will queue.
- boost::optional<Ticket> ticketNormalPriority;
- auto clientNormalPriority = this->getServiceContext()->makeClient("clientNormalPriority");
- auto opCtxNormalPriority = clientNormalPriority->makeOperationContext();
- // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must
- // survive the lifetime of the ticket.
- AdmissionContext admCtxNormalPriority;
- admCtxNormalPriority.setPriority(AdmissionContext::Priority::kNormal);
-
+ MockAdmission normalPriorityAdmission(
+ "normalPriority", this->getServiceContext(), AdmissionContext::Priority::kNormal);
stdx::thread normalPriorityThread([&]() {
- ticketNormalPriority = holder.waitForTicket(opCtxNormalPriority.get(),
- &admCtxNormalPriority,
- TicketHolder::WaitMode::kUninterruptible);
+ normalPriorityAdmission.ticket =
+ holder.waitForTicket(normalPriorityAdmission.opCtx.get(),
+ &normalPriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kUninterruptible);
});
// Wait for the threads to to queue for a ticket.
while (holder.queued() < 2) {
}
- ASSERT_EQ(stats["queueLength"], 2);
- ticket.reset();
+ initialAdmission.ticket.reset();
// Normal priority thread takes the ticket.
normalPriorityThread.join();
- ASSERT_TRUE(ticketNormalPriority);
- ASSERT_EQ(stats["removedFromQueue"], 1);
- ticketNormalPriority.reset();
+ ASSERT_TRUE(normalPriorityAdmission.ticket);
+ normalPriorityAdmission.ticket.reset();
// Low priority thread takes the ticket.
lowPriorityThread.join();
- ASSERT_TRUE(ticketLowPriority);
- ASSERT_EQ(stats["removedFromQueue"], 2);
- ticketLowPriority.reset();
+ ASSERT_TRUE(lowPriorityAdmission.ticket);
+ lowPriorityAdmission.ticket.reset();
}
- ASSERT_EQ(stats["addedToQueue"], 2);
- ASSERT_EQ(stats["removedFromQueue"], 2);
- ASSERT_EQ(stats["queueLength"], 0);
+ ASSERT_EQ(stats["out"], 0);
+ ASSERT_EQ(stats["available"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+
+ auto currentStats = stats.getStats();
+ auto lowPriorityStats = currentStats.getObjectField("lowPriority");
+ ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 2);
+ ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 2);
+ ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 2);
+ ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0);
+
+ auto normalPriorityStats = currentStats.getObjectField("normalPriority");
+ ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("removedFromQueue"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0);
}
TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
@@ -307,67 +329,51 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
{
// Allocate the only available ticket. Priority is irrelevant when there are tickets
// available.
- AdmissionContext admCtx;
- admCtx.setPriority(AdmissionContext::Priority::kLow);
- boost::optional<Ticket> ticket =
- holder.waitForTicket(_opCtx.get(), &admCtx, TicketHolder::WaitMode::kInterruptible);
- ASSERT(ticket);
-
- // Create a client corresponding to a low priority operation that will queue.
- boost::optional<Ticket> ticketLowPriority;
- auto clientLowPriority = this->getServiceContext()->makeClient("clientLowPriority");
- auto opCtxLowPriority = clientLowPriority->makeOperationContext();
- // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must
- // survive the lifetime of the ticket.
- AdmissionContext admCtxLowPriority;
- admCtxLowPriority.setPriority(AdmissionContext::Priority::kLow);
-
+ MockAdmission initialAdmission(
+ "initialAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow);
+ initialAdmission.ticket = holder.waitForTicket(initialAdmission.opCtx.get(),
+ &initialAdmission.admCtx,
+ TicketHolder::WaitMode::kInterruptible);
+ ASSERT(initialAdmission.ticket);
+
+ MockAdmission lowPriorityAdmission(
+ "lowPriority", this->getServiceContext(), AdmissionContext::Priority::kLow);
stdx::thread lowPriorityThread([&]() {
- ticketLowPriority = holder.waitForTicket(opCtxLowPriority.get(),
- &admCtxLowPriority,
- TicketHolder::WaitMode::kUninterruptible);
+ lowPriorityAdmission.ticket =
+ holder.waitForTicket(lowPriorityAdmission.opCtx.get(),
+ &lowPriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kUninterruptible);
});
- // Create a client corresponding to a normal priority operation that will queue.
- boost::optional<Ticket> ticketNormal1Priority;
- auto clientNormal1Priority = this->getServiceContext()->makeClient("clientNormal1Priority");
- auto opCtxNormal1Priority = clientNormal1Priority->makeOperationContext();
- // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must
- // survive the lifetime of the ticket.
- AdmissionContext admCtxNormal1Priority;
- admCtxNormal1Priority.setPriority(AdmissionContext::Priority::kNormal);
+ MockAdmission normal1PriorityAdmission(
+ "normal1Priority", this->getServiceContext(), AdmissionContext::Priority::kNormal);
stdx::thread normal1PriorityThread([&]() {
- ticketNormal1Priority = holder.waitForTicket(opCtxNormal1Priority.get(),
- &admCtxNormal1Priority,
- TicketHolder::WaitMode::kUninterruptible);
+ normal1PriorityAdmission.ticket =
+ holder.waitForTicket(normal1PriorityAdmission.opCtx.get(),
+ &normal1PriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kUninterruptible);
});
+
// Wait for threads on the queue
while (holder.queued() < 2) {
}
// Release the ticket.
- ticket.reset();
+ initialAdmission.ticket.reset();
// Normal priority thread takes the ticket
normal1PriorityThread.join();
- ASSERT_TRUE(ticketNormal1Priority);
- ASSERT_EQ(stats["removedFromQueue"], 1);
-
- // Create a client corresponding to a second normal priority operation that will be
- // prioritized over the queued low priority operation.
- boost::optional<Ticket> ticketNormal2Priority;
- auto clientNormal2Priority = this->getServiceContext()->makeClient("clientNormal2Priority");
- auto opCtxNormal2Priority = clientNormal2Priority->makeOperationContext();
- // Each ticket is assigned with a pointer to an AdmissionContext. The AdmissionContext must
- // survive the lifetime of the ticket.
- AdmissionContext admCtxNormal2Priority;
- admCtxNormal2Priority.setPriority(AdmissionContext::Priority::kNormal);
+ ASSERT_TRUE(normal1PriorityAdmission.ticket);
+
+ MockAdmission normal2PriorityAdmission(
+ "normal2Priority", this->getServiceContext(), AdmissionContext::Priority::kNormal);
stdx::thread normal2PriorityThread([&]() {
- ticketNormal2Priority = holder.waitForTicket(opCtxNormal2Priority.get(),
- &admCtxNormal2Priority,
- TicketHolder::WaitMode::kUninterruptible);
+ normal2PriorityAdmission.ticket =
+ holder.waitForTicket(normal2PriorityAdmission.opCtx.get(),
+ &normal2PriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kUninterruptible);
});
// Wait for the new thread on the queue.
@@ -375,23 +381,213 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
}
// Release the ticket.
- ticketNormal1Priority.reset();
+ normal1PriorityAdmission.ticket.reset();
// The other normal priority thread takes the ticket.
normal2PriorityThread.join();
- ASSERT_TRUE(ticketNormal2Priority);
- ASSERT_EQ(stats["removedFromQueue"], 2);
- ticketNormal2Priority.reset();
+ ASSERT_TRUE(normal2PriorityAdmission.ticket);
+ normal2PriorityAdmission.ticket.reset();
// Low priority thread takes the ticket.
lowPriorityThread.join();
- ASSERT_TRUE(ticketLowPriority);
- ASSERT_EQ(stats["removedFromQueue"], 3);
- ticketLowPriority.reset();
+ ASSERT_TRUE(lowPriorityAdmission.ticket);
+ lowPriorityAdmission.ticket.reset();
+ }
+
+ ASSERT_EQ(stats["out"], 0);
+ ASSERT_EQ(stats["available"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+
+ auto currentStats = stats.getStats();
+ auto lowPriorityStats = currentStats.getObjectField("lowPriority");
+ ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 2);
+ ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 2);
+
+ auto normalPriorityStats = currentStats.getObjectField("normalPriority");
+ ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), 2);
+ ASSERT_EQ(normalPriorityStats.getIntField("removedFromQueue"), 2);
+ ASSERT_EQ(normalPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 2);
+ ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 2);
+}
+
+TEST_F(TicketHolderTest, PriorityBasicMetrics) {
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
+ PriorityTicketHolder holder(1, &serviceContext);
+ Stats stats(&holder);
+
+ MockAdmission lowPriorityAdmission(
+ "lowPriorityAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow);
+ lowPriorityAdmission.ticket = holder.waitForTicket(lowPriorityAdmission.opCtx.get(),
+ &lowPriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kInterruptible);
+
+ unittest::Barrier barrierAcquiredTicket(2);
+ unittest::Barrier barrierReleaseTicket(2);
+
+ stdx::thread waiting([&]() {
+ // The ticket assigned to this admission is tied to the scope of the thread. Once the thread
+ // joins, the ticket is released back to the TicketHolder.
+ MockAdmission normalPriorityAdmission(
+ "normalPriority", this->getServiceContext(), AdmissionContext::Priority::kNormal);
+
+ normalPriorityAdmission.ticket =
+ holder.waitForTicket(normalPriorityAdmission.opCtx.get(),
+ &normalPriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kUninterruptible);
+ barrierAcquiredTicket.countDownAndWait();
+ barrierReleaseTicket.countDownAndWait();
+ });
+
+ while (holder.queued() == 0) {
+ // Wait for thread to start waiting.
+ }
+
+ {
+ // Test that the metrics eventually converge to the following set of values. There can be
+ // cases where the values are incorrect for brief periods of time due to optimistic
+ // concurrency.
+ auto deadline = Date_t::now() + Milliseconds{100};
+ while (true) {
+ try {
+ // ASSERT_EQ(stats["out"], 1);
+ ASSERT_EQ(stats["available"], 0);
+ // ASSERT_EQ(stats["addedToQueue"], 1);
+ // ASSERT_EQ(stats["queueLength"], 1);
+ break;
+ } catch (...) {
+ if (Date_t::now() > deadline) {
+ throw;
+ }
+ // Sleep to allow other threads to process and converge the metrics.
+ stdx::this_thread::sleep_for(Milliseconds{1}.toSystemDuration());
+ }
+ }
+ }
+ tickSource->advance(Microseconds(100));
+ lowPriorityAdmission.ticket.reset();
+
+ while (holder.queued() > 0) {
+ // Wait for thread to take ticket.
}
- ASSERT_EQ(stats["addedToQueue"], 3);
- ASSERT_EQ(stats["removedFromQueue"], 3);
- ASSERT_EQ(stats["queueLength"], 0);
+
+ barrierAcquiredTicket.countDownAndWait();
+ tickSource->advance(Microseconds(200));
+ barrierReleaseTicket.countDownAndWait();
+
+ waiting.join();
+ ASSERT_EQ(lowPriorityAdmission.admCtx.getAdmissions(), 1);
+
+ ASSERT_EQ(stats["out"], 0);
+ ASSERT_EQ(stats["available"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+
+ auto currentStats = stats.getStats();
+ auto lowPriorityStats = currentStats.getObjectField("lowPriority");
+ ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("totalTimeProcessingMicros"), 100);
+ ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0);
+
+ auto normalPriorityStats = currentStats.getObjectField("normalPriority");
+ ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("removedFromQueue"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("totalTimeProcessingMicros"), 200);
+ ASSERT_EQ(normalPriorityStats.getIntField("totalTimeQueuedMicros"), 100);
+ ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0);
+
+ // Retake ticket.
+ holder.waitForTicket(
+ _opCtx.get(), &lowPriorityAdmission.admCtx, TicketHolder::WaitMode::kInterruptible);
+
+ ASSERT_EQ(lowPriorityAdmission.admCtx.getAdmissions(), 2);
+
+ currentStats = stats.getStats();
+ lowPriorityStats = currentStats.getObjectField("lowPriority");
+ ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1);
+
+ normalPriorityStats = currentStats.getObjectField("normalPriority");
+ ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1);
}
+TEST_F(TicketHolderTest, PriorityCanceled) {
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
+ PriorityTicketHolder holder(1, &serviceContext);
+ Stats stats(&holder);
+ {
+ MockAdmission lowPriorityAdmission(
+ "lowPriorityAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow);
+ lowPriorityAdmission.ticket = holder.waitForTicket(lowPriorityAdmission.opCtx.get(),
+ &lowPriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kInterruptible);
+ stdx::thread waiting([&]() {
+ MockAdmission normalPriorityAdmission(
+ "normalPriority", this->getServiceContext(), AdmissionContext::Priority::kNormal);
+
+ auto deadline = Date_t::now() + Milliseconds(100);
+ normalPriorityAdmission.ticket =
+ holder.waitForTicketUntil(normalPriorityAdmission.opCtx.get(),
+ &normalPriorityAdmission.admCtx,
+ deadline,
+ TicketHolder::WaitMode::kInterruptible);
+ ASSERT_FALSE(normalPriorityAdmission.ticket);
+ });
+
+ while (holder.queued() == 0) {
+ // Wait for thread to take ticket.
+ }
+
+ tickSource->advance(Microseconds(100));
+ waiting.join();
+ }
+
+ ASSERT_EQ(stats["out"], 0);
+ ASSERT_EQ(stats["available"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+
+ auto currentStats = stats.getStats();
+ auto lowPriorityStats = currentStats.getObjectField("lowPriority");
+ ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("totalTimeProcessingMicros"), 100);
+ ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0);
+
+ auto normalPriorityStats = currentStats.getObjectField("normalPriority");
+ ASSERT_EQ(normalPriorityStats.getIntField("addedToQueue"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("removedFromQueue"), 1);
+ ASSERT_EQ(normalPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("totalTimeQueuedMicros"), 100);
+ ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 0);
+ ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 1);
+}
} // namespace