summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorHaley Connelly <haley.connelly@mongodb.com>2022-09-22 12:13:59 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-09-22 13:42:01 +0000
commit3c408327dc577ca60c9c1f1c2aa08ffdc4942611 (patch)
treefcffdb72bb5174c20f26d00b012fb69e17639776
parent6dae6fed07e6d99cf23597101bcf7f9716c7f498 (diff)
downloadmongo-3c408327dc577ca60c9c1f1c2aa08ffdc4942611.tar.gz
SERVER-67951 Record AdmissionContext::Priority::kImmediate statistics
-rw-r--r--src/mongo/db/concurrency/lock_state.cpp12
-rw-r--r--src/mongo/db/concurrency/lock_state_test.cpp12
-rw-r--r--src/mongo/db/concurrency/locker.h2
-rw-r--r--src/mongo/util/concurrency/admission_context.h12
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp149
-rw-r--r--src/mongo/util/concurrency/ticketholder.h125
-rw-r--r--src/mongo/util/concurrency/ticketholder_test.cpp169
7 files changed, 363 insertions, 118 deletions
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp
index 69ea7d13bfd..9a49e3aa462 100644
--- a/src/mongo/db/concurrency/lock_state.cpp
+++ b/src/mongo/db/concurrency/lock_state.cpp
@@ -368,10 +368,15 @@ void LockerImpl::reacquireTicket(OperationContext* opCtx) {
bool LockerImpl::_acquireTicket(OperationContext* opCtx, LockMode mode, Date_t deadline) {
_admCtx.setLockMode(mode);
+
+ // Upon startup, the holder is not guaranteed to be initialized.
+ auto holder = _ticketHolder;
const bool reader = isSharedLockMode(mode);
- auto holder = shouldAcquireTicket() ? _ticketHolder : nullptr;
- // MODE_X is exclusive of all other locks, thus acquiring a ticket is unnecessary.
- if (mode != MODE_X && mode != MODE_NONE && holder) {
+
+ if (!shouldWaitForTicket() && holder) {
+ _ticket = holder->acquireImmediateTicket(&_admCtx);
+ } else if (mode != MODE_X && mode != MODE_NONE && holder) {
+ // MODE_X is exclusive of all other locks, thus acquiring a ticket is unnecessary.
_clientState.store(reader ? kQueuedReader : kQueuedWriter);
// If the ticket wait is interrupted, restore the state of the client.
ScopeGuard restoreStateOnErrorGuard([&] {
@@ -396,6 +401,7 @@ bool LockerImpl::_acquireTicket(OperationContext* opCtx, LockMode mode, Date_t d
}
restoreStateOnErrorGuard.dismiss();
}
+
_clientState.store(reader ? kActiveReader : kActiveWriter);
return true;
}
diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp
index d5750316327..607c2c076f7 100644
--- a/src/mongo/db/concurrency/lock_state_test.cpp
+++ b/src/mongo/db/concurrency/lock_state_test.cpp
@@ -1241,27 +1241,27 @@ TEST_F(LockerImplTest, SetTicketAcquisitionForLockRAIIType) {
auto opCtx = makeOperationContext();
// By default, ticket acquisition is required.
- ASSERT_TRUE(opCtx->lockState()->shouldAcquireTicket());
+ ASSERT_TRUE(opCtx->lockState()->shouldWaitForTicket());
{
SetTicketAquisitionPriorityForLock setTicketAquisition(
opCtx.get(), AdmissionContext::Priority::kImmediate);
- ASSERT_FALSE(opCtx->lockState()->shouldAcquireTicket());
+ ASSERT_FALSE(opCtx->lockState()->shouldWaitForTicket());
}
- ASSERT_TRUE(opCtx->lockState()->shouldAcquireTicket());
+ ASSERT_TRUE(opCtx->lockState()->shouldWaitForTicket());
// If ticket acquisitions are disabled on the lock state, the RAII type has no effect.
opCtx->lockState()->setAdmissionPriority(AdmissionContext::Priority::kImmediate);
- ASSERT_FALSE(opCtx->lockState()->shouldAcquireTicket());
+ ASSERT_FALSE(opCtx->lockState()->shouldWaitForTicket());
{
SetTicketAquisitionPriorityForLock setTicketAquisition(
opCtx.get(), AdmissionContext::Priority::kImmediate);
- ASSERT_FALSE(opCtx->lockState()->shouldAcquireTicket());
+ ASSERT_FALSE(opCtx->lockState()->shouldWaitForTicket());
}
- ASSERT_FALSE(opCtx->lockState()->shouldAcquireTicket());
+ ASSERT_FALSE(opCtx->lockState()->shouldWaitForTicket());
}
// This test exercises the lock dumping code in ~LockerImpl in case locks are held on destruction.
diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h
index d29774e39a8..0f3e760b54d 100644
--- a/src/mongo/db/concurrency/locker.h
+++ b/src/mongo/db/concurrency/locker.h
@@ -521,7 +521,7 @@ public:
return _admCtx.getPriority();
}
- bool shouldAcquireTicket() const {
+ bool shouldWaitForTicket() const {
return _admCtx.getPriority() != AdmissionContext::Priority::kImmediate;
}
diff --git a/src/mongo/util/concurrency/admission_context.h b/src/mongo/util/concurrency/admission_context.h
index 304a57dc434..8eddb3cf657 100644
--- a/src/mongo/util/concurrency/admission_context.h
+++ b/src/mongo/util/concurrency/admission_context.h
@@ -57,13 +57,11 @@ public:
* user and internal, should use this priority unless they qualify as 'kLow' or 'kImmediate'
* priority.
*
- * 'kImmediate': It's crucial that the operation makes forward progress - bypassing ticket
- * acquisition. Reserved for operations critical to availability (e.g. replication workers) or
- * observability (e.g. FTDC), and any operation that is releasing resources (e.g. committing or
- * aborting prepared transactions). Should be used sparingly.
- *
- * TODO SERVER-67951: Update comment to address that kImmediate priority operations are always
- * granted a ticket immediately upon request.
+ * 'kImmediate': It's crucial that the operation makes forward progress - and acquire a ticket
+ * immediately upon request, without waiting.
+ * Reserved for operations critical to availability (e.g. replication workers) or observability
+ * (e.g. FTDC), and any operation that is releasing resources (e.g. committing or aborting
+ * prepared transactions). Should be used sparingly.
*/
enum class Priority { kLow, kNormal, kImmediate };
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index b5663a65b54..d910f292129 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -41,12 +41,33 @@
#define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault
+namespace mongo {
+
namespace {
const auto ticketHolderDecoration =
mongo::ServiceContext::declareDecoration<std::unique_ptr<mongo::TicketHolder>>();
+
+void updateQueueStatsOnRelease(ServiceContext* serviceContext,
+ TicketHolderWithQueueingStats::QueueStats& queueStats,
+ AdmissionContext* admCtx) {
+ queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1);
+ auto startTime = admCtx->getStartProcessingTime();
+ auto tickSource = serviceContext->getTickSource();
+ auto delta = tickSource->spanTo<Microseconds>(startTime, tickSource->getTicks());
+ queueStats.totalTimeProcessingMicros.fetchAndAddRelaxed(delta.count());
}
-namespace mongo {
+void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext,
+ TicketHolderWithQueueingStats::QueueStats& queueStats,
+ AdmissionContext* admCtx) {
+ if (admCtx->getAdmissions() == 0) {
+ queueStats.totalNewAdmissions.fetchAndAddRelaxed(1);
+ }
+ admCtx->start(serviceContext->getTickSource());
+ queueStats.totalStartedProcessing.fetchAndAddRelaxed(1);
+}
+
+} // namespace
TicketHolder* TicketHolder::get(ServiceContext* svcCtx) {
return ticketHolderDecoration(svcCtx).get();
@@ -58,6 +79,21 @@ void TicketHolder::use(ServiceContext* svcCtx, std::unique_ptr<TicketHolder> new
ReaderWriterTicketHolder::~ReaderWriterTicketHolder(){};
+Ticket ReaderWriterTicketHolder::acquireImmediateTicket(AdmissionContext* admCtx) {
+ switch (admCtx->getLockMode()) {
+ case MODE_IS:
+ case MODE_S:
+ return _reader->acquireImmediateTicket(admCtx);
+ case MODE_IX:
+ return _writer->acquireImmediateTicket(admCtx);
+ default:
+ // Tickets are linked to the GlobalLock and a MODE_X lock is already exclusive - all
+ // other operations waiting for MODE_X will be blocked so no need to go through the
+ // ticketing mechanism.
+ MONGO_UNREACHABLE;
+ }
+}
+
boost::optional<Ticket> ReaderWriterTicketHolder::tryAcquire(AdmissionContext* admCtx) {
switch (admCtx->getLockMode()) {
@@ -115,13 +151,25 @@ void ReaderWriterTicketHolder::appendStats(BSONObjBuilder& b) const {
}
}
-void ReaderWriterTicketHolder::_release(AdmissionContext* admCtx) noexcept {
+void ReaderWriterTicketHolder::_releaseImmediateTicket(AdmissionContext* admCtx) noexcept {
switch (admCtx->getLockMode()) {
case MODE_IS:
case MODE_S:
- return _reader->_release(admCtx);
+ return _reader->_releaseImmediateTicket(admCtx);
case MODE_IX:
- return _writer->_release(admCtx);
+ return _writer->_releaseImmediateTicket(admCtx);
+ default:
+ MONGO_UNREACHABLE;
+ }
+}
+
+void ReaderWriterTicketHolder::_releaseToTicketPool(AdmissionContext* admCtx) noexcept {
+ switch (admCtx->getLockMode()) {
+ case MODE_IS:
+ case MODE_S:
+ return _reader->_releaseToTicketPool(admCtx);
+ case MODE_IX:
+ return _writer->_releaseToTicketPool(admCtx);
default:
MONGO_UNREACHABLE;
}
@@ -135,6 +183,15 @@ void ReaderWriterTicketHolder::resizeWriters(int newSize) {
return _writer->resize(newSize);
}
+Ticket TicketHolderWithQueueingStats::acquireImmediateTicket(AdmissionContext* admCtx) {
+ invariant(admCtx->getPriority() == AdmissionContext::Priority::kImmediate);
+ if (recordImmediateTicketStatistics()) {
+ auto& queueStats = _getQueueStatsToUse(admCtx);
+ updateQueueStatsOnTicketAcquisition(_serviceContext, queueStats, admCtx);
+ }
+ return Ticket{this, admCtx};
+}
+
void TicketHolderWithQueueingStats::resize(int newSize) noexcept {
stdx::lock_guard<Latch> lk(_resizeMutex);
@@ -149,14 +206,17 @@ void TicketHolderWithQueueingStats::appendStats(BSONObjBuilder& b) const {
_appendImplStats(b);
}
-void TicketHolderWithQueueingStats::_release(AdmissionContext* admCtx) noexcept {
+void TicketHolderWithQueueingStats::_releaseImmediateTicket(AdmissionContext* admCtx) noexcept {
+ if (recordImmediateTicketStatistics()) {
+ auto& queueStats = _getQueueStatsToUse(admCtx);
+ updateQueueStatsOnRelease(_serviceContext, queueStats, admCtx);
+ }
+}
+
+void TicketHolderWithQueueingStats::_releaseToTicketPool(AdmissionContext* admCtx) noexcept {
auto& queueStats = _getQueueStatsToUse(admCtx);
- queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1);
- auto startTime = admCtx->getStartProcessingTime();
- auto tickSource = _serviceContext->getTickSource();
- auto delta = tickSource->spanTo<Microseconds>(startTime, tickSource->getTicks());
- queueStats.totalTimeProcessingMicros.fetchAndAddRelaxed(delta.count());
- _releaseQueue(admCtx);
+ updateQueueStatsOnRelease(_serviceContext, queueStats, admCtx);
+ _releaseToTicketPoolImpl(admCtx);
}
Ticket TicketHolderWithQueueingStats::waitForTicket(OperationContext* opCtx,
@@ -168,17 +228,14 @@ Ticket TicketHolderWithQueueingStats::waitForTicket(OperationContext* opCtx,
}
boost::optional<Ticket> TicketHolderWithQueueingStats::tryAcquire(AdmissionContext* admCtx) {
- invariant(admCtx);
-
+ // kImmediate operations don't need to 'try' to acquire a ticket, they should always get a
+ // ticket immediately.
+ invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate);
auto ticket = _tryAcquireImpl(admCtx);
- // Track statistics.
+
if (ticket) {
auto& queueStats = _getQueueStatsToUse(admCtx);
- if (admCtx->getAdmissions() == 0) {
- queueStats.totalNewAdmissions.fetchAndAddRelaxed(1);
- }
- admCtx->start(_serviceContext->getTickSource());
- queueStats.totalStartedProcessing.fetchAndAddRelaxed(1);
+ updateQueueStatsOnTicketAcquisition(_serviceContext, queueStats, admCtx);
}
return ticket;
}
@@ -218,11 +275,7 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat
if (ticket) {
cancelWait.dismiss();
- if (admCtx->getAdmissions() == 0) {
- queueStats.totalNewAdmissions.fetchAndAddRelaxed(1);
- }
- admCtx->start(_serviceContext->getTickSource());
- queueStats.totalStartedProcessing.fetchAndAddRelaxed(1);
+ updateQueueStatsOnTicketAcquisition(_serviceContext, queueStats, admCtx);
return ticket;
} else {
return boost::none;
@@ -329,7 +382,7 @@ boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(Operation
return Ticket{this, admCtx};
}
-void SemaphoreTicketHolder::_releaseQueue(AdmissionContext* admCtx) noexcept {
+void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
check(sem_post(&_sem));
}
@@ -394,7 +447,7 @@ boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(Operation
return Ticket{this, admCtx};
}
-void SemaphoreTicketHolder::_releaseQueue(AdmissionContext* admCtx) noexcept {
+void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
{
stdx::lock_guard<Latch> lk(_mutex);
_numTickets++;
@@ -455,8 +508,11 @@ int SchedulingTicketHolder::queued() const {
return _enqueuedElements.loadRelaxed();
}
-void SchedulingTicketHolder::_releaseQueue(AdmissionContext* admCtx) noexcept {
- invariant(admCtx);
+void SchedulingTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept {
+ // Tickets acquired with priority kImmediate are not generated from the pool of available
+ // tickets, and thus should never be returned to the pool of available tickets.
+ invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate);
+
// The idea behind the release mechanism consists of a consistent view of queued elements
// waiting for a ticket and many threads releasing tickets simultaneously. The releasers will
// proceed to attempt to dequeue an element by seeing if there are threads not woken and waking
@@ -598,10 +654,12 @@ bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx,
}
PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext)
- : SchedulingTicketHolder(numTickets, 2, serviceContext) {}
+ : SchedulingTicketHolder(numTickets, 3, serviceContext) {}
void PriorityTicketHolder::_dequeueWaitingThread() {
- int currentIndexQueue = static_cast<unsigned int>(QueueType::QueueTypeSize) - 1;
+ // There should never be anything to dequeue from 'QueueType::ImmediatePriorityNoOpQueue' since
+ // 'kImmediate' operations should always bypass the need to queue.
+ int currentIndexQueue = static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue) - 1;
while (!_queues[currentIndexQueue].attemptToDequeue()) {
if (currentIndexQueue == 0)
break;
@@ -613,16 +671,33 @@ void PriorityTicketHolder::_dequeueWaitingThread() {
void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const {
{
BSONObjBuilder bbb(b.subobjStart("lowPriority"));
- auto& queueStats =
+ auto& lowPriorityTicketStats =
_queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)].getStats();
- _appendPriorityStats(bbb, queueStats);
+ _appendPriorityStats(bbb, lowPriorityTicketStats);
bbb.done();
}
{
BSONObjBuilder bbb(b.subobjStart("normalPriority"));
- auto& queueStats =
+ auto& normalPriorityTicketStats =
_queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)].getStats();
- _appendPriorityStats(bbb, queueStats);
+ _appendPriorityStats(bbb, normalPriorityTicketStats);
+ bbb.done();
+ }
+ {
+ BSONObjBuilder bbb(b.subobjStart("immediatePriority"));
+ // Since 'kImmediate' priority operations will never queue, omit queueing statistics that
+ // will always be 0.
+ auto& immediateTicketStats =
+ _queues[static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue)].getStats();
+
+ auto finished = immediateTicketStats.totalFinishedProcessing.loadRelaxed();
+ auto started = immediateTicketStats.totalStartedProcessing.loadRelaxed();
+ bbb.append("startedProcessing", started);
+ bbb.append("processing", std::max(static_cast<int>(started - finished), 0));
+ bbb.append("finishedProcessing", finished);
+ bbb.append("totalTimeProcessingMicros",
+ immediateTicketStats.totalTimeProcessingMicros.loadRelaxed());
+ bbb.append("newAdmissions", immediateTicketStats.totalNewAdmissions.loadRelaxed());
bbb.done();
}
}
@@ -654,9 +729,11 @@ SchedulingTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse(
return _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)];
case AdmissionContext::Priority::kNormal:
return _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)];
- default:
- MONGO_UNREACHABLE;
+ case AdmissionContext::Priority::kImmediate:
+ return _queues[static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue)];
}
+
+ MONGO_UNREACHABLE;
}
TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse(
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index e9846e0173f..60f5d167769 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -71,6 +71,12 @@ public:
static void use(ServiceContext* svcCtx, std::unique_ptr<TicketHolder> newTicketHolder);
/**
+ * Immediately returns a ticket without impacting the number of tickets available. Reserved for
+ * operations that should never be throttled by the ticketing mechanism.
+ */
+ virtual Ticket acquireImmediateTicket(AdmissionContext* admCtx) = 0;
+
+ /**
* Attempts to acquire a ticket without blocking.
* Returns a boolean indicating whether the operation was successful or not.
*/
@@ -98,7 +104,17 @@ public:
virtual void appendStats(BSONObjBuilder& b) const = 0;
private:
- virtual void _release(AdmissionContext* admCtx) noexcept = 0;
+ /**
+ * Restricted for releasing tickets acquired via "acquireImmediateTicket". Handles the release
+ * of an immediate ticket, which should never be reused or returned to the ticketing pool of
+ * available tickets.
+ */
+ virtual void _releaseImmediateTicket(AdmissionContext* admCtx) noexcept = 0;
+
+ /**
+ * Releases a ticket back into the ticketing pool.
+ */
+ virtual void _releaseToTicketPool(AdmissionContext* admCtx) noexcept = 0;
};
/**
@@ -108,56 +124,40 @@ class TicketHolderWithQueueingStats : public TicketHolder {
friend class ReaderWriterTicketHolder;
public:
- /**
- * Wait mode for ticket acquisition: interruptible or uninterruptible.
- */
TicketHolderWithQueueingStats(int numTickets, ServiceContext* svcCtx)
: _outof(numTickets), _serviceContext(svcCtx){};
~TicketHolderWithQueueingStats() override{};
- /**
- * Attempts to acquire a ticket without blocking.
- * Returns a boolean indicating whether the operation was successful or not.
- */
+ Ticket acquireImmediateTicket(AdmissionContext* admCtx) override final;
+
boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) override;
- /**
- * Attempts to acquire a ticket. Blocks until a ticket is acquired or the OperationContext
- * 'opCtx' is killed, throwing an AssertionException.
- */
Ticket waitForTicket(OperationContext* opCtx,
AdmissionContext* admCtx,
TicketHolder::WaitMode waitMode) override;
- /**
- * Attempts to acquire a ticket within a deadline, 'until'. Returns 'true' if a ticket is
- * acquired and 'false' if the deadline is reached, but the operation is retryable. Throws an
- * AssertionException if the OperationContext 'opCtx' is killed and no waits for tickets can
- * proceed.
- */
boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx,
AdmissionContext* admCtx,
Date_t until,
TicketHolder::WaitMode waitMode) override;
+ /**
+ * Adjusts the total number of tickets allocated for the ticket pool to 'newSize'.
+ */
void resize(int newSize) noexcept;
- virtual int available() const = 0;
-
virtual int used() const {
return outof() - available();
}
+ /**
+ * The total number of tickets allotted to the ticket pool.
+ */
int outof() const {
return _outof.loadRelaxed();
}
- /**
- * Returns the total number of operations queued - regardles of queueing policy.
- */
- virtual int queued() const = 0;
-
void appendStats(BSONObjBuilder& b) const override;
/**
@@ -176,7 +176,32 @@ public:
AtomicWord<std::int64_t> totalTimeQueuedMicros{0};
};
+ /**
+ * Instantaneous number of operations waiting in queue for a ticket.
+ */
+ virtual int queued() const = 0;
+
+ /**
+ * Instantaneous number of tickets 'available' (not checked out by an operation) in the ticket
+ * pool.
+ */
+ virtual int available() const = 0;
+
+ /**
+ * 'Immediate' tickets are acquired and released independent of the ticketing pool and queueing
+ * system. Subclasses must define whether they wish to record statistics surrounding 'immediate'
+ * tickets in addition to standard queueing statistics.
+ *
+ * Returns true if statistics surrounding 'immediate' tickets are to be tracked. False
+ * otherwise.
+ */
+ virtual bool recordImmediateTicketStatistics() = 0;
+
private:
+ void _releaseImmediateTicket(AdmissionContext* admCtx) noexcept final;
+
+ void _releaseToTicketPool(AdmissionContext* admCtx) noexcept override final;
+
virtual boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) = 0;
virtual boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
@@ -186,9 +211,7 @@ private:
virtual void _appendImplStats(BSONObjBuilder& b) const = 0;
- void _release(AdmissionContext* admCtx) noexcept override;
-
- virtual void _releaseQueue(AdmissionContext* admCtx) noexcept = 0;
+ virtual void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept = 0;
virtual void _resize(int newSize, int oldSize) noexcept = 0;
@@ -220,26 +243,14 @@ public:
~ReaderWriterTicketHolder() override final;
- /**
- * Attempts to acquire a ticket without blocking.
- * Returns a boolean indicating whether the operation was successful or not.
- */
+ Ticket acquireImmediateTicket(AdmissionContext* admCtx) override final;
+
boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) override final;
- /**
- * Attempts to acquire a ticket. Blocks until a ticket is acquired or the OperationContext
- * 'opCtx' is killed, throwing an AssertionException.
- */
Ticket waitForTicket(OperationContext* opCtx,
AdmissionContext* admCtx,
WaitMode waitMode) override final;
- /**
- * Attempts to acquire a ticket within a deadline, 'until'. Returns 'true' if a ticket is
- * acquired and 'false' if the deadline is reached, but the operation is retryable. Throws an
- * AssertionException if the OperationContext 'opCtx' is killed and no waits for tickets can
- * proceed.
- */
boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx,
AdmissionContext* admCtx,
Date_t until,
@@ -251,7 +262,8 @@ public:
void resizeWriters(int newSize);
private:
- void _release(AdmissionContext* admCtx) noexcept override final;
+ void _releaseImmediateTicket(AdmissionContext* admCtx) noexcept final;
+ void _releaseToTicketPool(AdmissionContext* admCtx) noexcept override final;
std::unique_ptr<TicketHolderWithQueueingStats> _reader;
std::unique_ptr<TicketHolderWithQueueingStats> _writer;
@@ -270,6 +282,13 @@ public:
return std::max(static_cast<int>(added - removed), 0);
};
+ bool recordImmediateTicketStatistics() noexcept override final {
+ // Historically, operations that now acquire 'immediate' tickets bypassed the ticketing
+ // mechanism completely. Preserve legacy behavior where 'immediate' ticketing is not tracked
+ // in the statistics.
+ return false;
+ }
+
private:
boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx,
AdmissionContext* admCtx,
@@ -277,7 +296,7 @@ private:
WaitMode waitMode) override final;
boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final;
- void _releaseQueue(AdmissionContext* admCtx) noexcept override final;
+ void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final;
void _appendImplStats(BSONObjBuilder& b) const override final;
@@ -375,6 +394,10 @@ public:
int queued() const override final;
+ bool recordImmediateTicketStatistics() noexcept override final {
+ return true;
+ };
+
private:
bool _tryAcquireTicket();
@@ -385,7 +408,7 @@ private:
Date_t until,
WaitMode waitMode) override final;
- void _releaseQueue(AdmissionContext* admCtx) noexcept override final;
+ void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final;
void _resize(int newSize, int oldSize) noexcept override final;
@@ -426,7 +449,10 @@ private:
enum class QueueType : unsigned int {
LowPriorityQueue = 0,
NormalPriorityQueue = 1,
- QueueTypeSize = 2
+ // Exclusively used for statistics tracking. This queue should never have any processes
+ // 'queued'.
+ ImmediatePriorityNoOpQueue = 2,
+ QueueTypeSize = 3
};
void _dequeueWaitingThread() override final;
@@ -469,7 +495,11 @@ public:
~Ticket() {
if (_ticketholder) {
- _ticketholder->_release(_admissionContext);
+ if (_admissionContext->getPriority() == AdmissionContext::Priority::kImmediate) {
+ _ticketholder->_releaseImmediateTicket(_admissionContext);
+ } else {
+ _ticketholder->_releaseToTicketPool(_admissionContext);
+ }
}
}
@@ -499,5 +529,4 @@ private:
TicketHolder* _ticketholder;
AdmissionContext* _admissionContext;
};
-
} // namespace mongo
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp
index 779b0777991..7651a66b7f2 100644
--- a/src/mongo/util/concurrency/ticketholder_test.cpp
+++ b/src/mongo/util/concurrency/ticketholder_test.cpp
@@ -62,6 +62,27 @@ protected:
ServiceContext::UniqueOperationContext _opCtx;
};
+static inline const Seconds kWaitTimeout{2};
+static inline const Milliseconds kSleepTime{1};
+
+/**
+ * Asserts that eventually the predicate does not throw an exception.
+ */
+void assertSoon(std::function<void()> predicate, Milliseconds timeout = kWaitTimeout) {
+ Timer t;
+ while (true) {
+ try {
+ predicate();
+ break;
+ } catch (...) {
+ if (t.elapsed() >= timeout) {
+ throw;
+ }
+ sleepFor(kSleepTime);
+ }
+ }
+}
+
template <class H>
void basicTimeout(OperationContext* opCtx) {
ServiceContext serviceContext;
@@ -185,7 +206,7 @@ struct MockAdmission {
};
template <class H>
-void resizeTest(OperationContext* opCtx) {
+void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperation = false) {
// Verify that resize operations don't alter metrics outside of those linked to the number of
// tickets.
ServiceContext serviceContext;
@@ -195,6 +216,16 @@ void resizeTest(OperationContext* opCtx) {
std::unique_ptr<TicketHolderWithQueueingStats> holder = std::make_unique<H>(1, &serviceContext);
Stats stats(holder.get());
+ // An outstanding kImmediate priority operation should not impact resize statistics.
+ MockAdmission immediatePriorityAdmission("immediatePriorityAdmission",
+ getGlobalServiceContext(),
+ AdmissionContext::Priority::kImmediate);
+ if (testWithOutstandingImmediateOperation) {
+ immediatePriorityAdmission.ticket =
+ holder->acquireImmediateTicket(&immediatePriorityAdmission.admCtx);
+ ASSERT(immediatePriorityAdmission.ticket);
+ }
+
AdmissionContext admCtx;
admCtx.setPriority(AdmissionContext::Priority::kNormal);
@@ -242,6 +273,12 @@ TEST_F(TicketHolderTest, ResizeStatsSemaphore) {
TEST_F(TicketHolderTest, ResizeStatsPriority) {
resizeTest<PriorityTicketHolder>(_opCtx.get());
}
+TEST_F(TicketHolderTest, ResizeStatsSemaphoreWithOutstandingImmediatePriority) {
+ resizeTest<SemaphoreTicketHolder>(_opCtx.get(), true);
+}
+TEST_F(TicketHolderTest, ResizeStatsPriorityWithOutstandingImmediatePriority) {
+ resizeTest<PriorityTicketHolder>(_opCtx.get(), true);
+}
TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) {
ServiceContext serviceContext;
@@ -318,6 +355,13 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) {
ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 1);
ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1);
ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0);
+
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0);
}
TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
@@ -414,6 +458,13 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 2);
ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0);
ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 2);
+
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0);
}
TEST_F(TicketHolderTest, PriorityBasicMetrics) {
@@ -454,23 +505,9 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) {
// Test that the metrics eventually converge to the following set of values. There can be
// cases where the values are incorrect for brief periods of time due to optimistic
// concurrency.
- auto deadline = Date_t::now() + Milliseconds{100};
- while (true) {
- try {
- // ASSERT_EQ(stats["out"], 1);
- ASSERT_EQ(stats["available"], 0);
- // ASSERT_EQ(stats["addedToQueue"], 1);
- // ASSERT_EQ(stats["queueLength"], 1);
- break;
- } catch (...) {
- if (Date_t::now() > deadline) {
- throw;
- }
- // Sleep to allow other threads to process and converge the metrics.
- stdx::this_thread::sleep_for(Milliseconds{1}.toSystemDuration());
- }
- }
+ assertSoon([&] { ASSERT_EQ(stats["available"], 0); });
}
+
tickSource->advance(Microseconds(100));
lowPriorityAdmission.ticket.reset();
@@ -514,6 +551,13 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) {
ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1);
ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0);
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0);
+
// Retake ticket.
holder.waitForTicket(
_opCtx.get(), &lowPriorityAdmission.admCtx, TicketHolder::WaitMode::kInterruptible);
@@ -528,6 +572,90 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) {
ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1);
}
+TEST_F(TicketHolderTest, PrioritImmediateMetrics) {
+ ServiceContext serviceContext;
+ serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
+ auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource());
+ PriorityTicketHolder holder(1, &serviceContext);
+ Stats stats(&holder);
+
+ MockAdmission lowPriorityAdmission(
+ "lowPriorityAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow);
+ lowPriorityAdmission.ticket = holder.waitForTicket(lowPriorityAdmission.opCtx.get(),
+ &lowPriorityAdmission.admCtx,
+ TicketHolder::WaitMode::kInterruptible);
+ ASSERT(lowPriorityAdmission.ticket);
+ {
+ // Test that the metrics eventually converge to the following set of values. There can be
+ // cases where the values are incorrect for brief periods of time due to optimistic
+ // concurrency.
+ assertSoon([&] {
+ ASSERT_EQ(stats["available"], 0);
+ ASSERT_EQ(stats["out"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+ });
+ }
+
+ MockAdmission immediatePriorityAdmission("immediatePriorityAdmission",
+ this->getServiceContext(),
+ AdmissionContext::Priority::kImmediate);
+ immediatePriorityAdmission.ticket =
+ holder.acquireImmediateTicket(&immediatePriorityAdmission.admCtx);
+ ASSERT(immediatePriorityAdmission.ticket);
+
+ {
+ // Test that the metrics eventually converge to the following set of values. There can be
+ // cases where the values are incorrect for brief periods of time due to optimistic
+ // concurrency.
+ assertSoon([&]() {
+ // only reported in the priority specific statistics.
+ ASSERT_EQ(stats["available"], 0);
+ ASSERT_EQ(stats["out"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+
+ auto currentStats = stats.getStats();
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 1);
+
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 1);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ });
+ }
+
+ lowPriorityAdmission.ticket.reset();
+
+ tickSource->advance(Microseconds(200));
+
+ assertSoon([&] {
+ ASSERT_EQ(stats["out"], 0);
+ ASSERT_EQ(stats["available"], 1);
+ ASSERT_EQ(stats["totalTickets"], 1);
+ });
+
+ immediatePriorityAdmission.ticket.reset();
+
+ auto currentStats = stats.getStats();
+ auto lowPriorityStats = currentStats.getObjectField("lowPriority");
+ ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0);
+ ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1);
+ ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0);
+
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 1);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 1);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 200);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 1);
+}
+
TEST_F(TicketHolderTest, PriorityCanceled) {
ServiceContext serviceContext;
serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>());
@@ -589,5 +717,12 @@ TEST_F(TicketHolderTest, PriorityCanceled) {
ASSERT_EQ(normalPriorityStats.getIntField("totalTimeQueuedMicros"), 100);
ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 0);
ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 1);
+
+ auto immediatePriorityStats = currentStats.getObjectField("immediatePriority");
+ ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0);
+ ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0);
}
} // namespace