diff options
author | Haley Connelly <haley.connelly@mongodb.com> | 2022-09-22 12:13:59 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-09-22 13:42:01 +0000 |
commit | 3c408327dc577ca60c9c1f1c2aa08ffdc4942611 (patch) | |
tree | fcffdb72bb5174c20f26d00b012fb69e17639776 | |
parent | 6dae6fed07e6d99cf23597101bcf7f9716c7f498 (diff) | |
download | mongo-3c408327dc577ca60c9c1f1c2aa08ffdc4942611.tar.gz |
SERVER-67951 Record AdmissionContext::Priority::kImmediate statistics
-rw-r--r-- | src/mongo/db/concurrency/lock_state.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/concurrency/lock_state_test.cpp | 12 | ||||
-rw-r--r-- | src/mongo/db/concurrency/locker.h | 2 | ||||
-rw-r--r-- | src/mongo/util/concurrency/admission_context.h | 12 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 149 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 125 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_test.cpp | 169 |
7 files changed, 363 insertions, 118 deletions
diff --git a/src/mongo/db/concurrency/lock_state.cpp b/src/mongo/db/concurrency/lock_state.cpp index 69ea7d13bfd..9a49e3aa462 100644 --- a/src/mongo/db/concurrency/lock_state.cpp +++ b/src/mongo/db/concurrency/lock_state.cpp @@ -368,10 +368,15 @@ void LockerImpl::reacquireTicket(OperationContext* opCtx) { bool LockerImpl::_acquireTicket(OperationContext* opCtx, LockMode mode, Date_t deadline) { _admCtx.setLockMode(mode); + + // Upon startup, the holder is not guaranteed to be initialized. + auto holder = _ticketHolder; const bool reader = isSharedLockMode(mode); - auto holder = shouldAcquireTicket() ? _ticketHolder : nullptr; - // MODE_X is exclusive of all other locks, thus acquiring a ticket is unnecessary. - if (mode != MODE_X && mode != MODE_NONE && holder) { + + if (!shouldWaitForTicket() && holder) { + _ticket = holder->acquireImmediateTicket(&_admCtx); + } else if (mode != MODE_X && mode != MODE_NONE && holder) { + // MODE_X is exclusive of all other locks, thus acquiring a ticket is unnecessary. _clientState.store(reader ? kQueuedReader : kQueuedWriter); // If the ticket wait is interrupted, restore the state of the client. ScopeGuard restoreStateOnErrorGuard([&] { @@ -396,6 +401,7 @@ bool LockerImpl::_acquireTicket(OperationContext* opCtx, LockMode mode, Date_t d } restoreStateOnErrorGuard.dismiss(); } + _clientState.store(reader ? kActiveReader : kActiveWriter); return true; } diff --git a/src/mongo/db/concurrency/lock_state_test.cpp b/src/mongo/db/concurrency/lock_state_test.cpp index d5750316327..607c2c076f7 100644 --- a/src/mongo/db/concurrency/lock_state_test.cpp +++ b/src/mongo/db/concurrency/lock_state_test.cpp @@ -1241,27 +1241,27 @@ TEST_F(LockerImplTest, SetTicketAcquisitionForLockRAIIType) { auto opCtx = makeOperationContext(); // By default, ticket acquisition is required. - ASSERT_TRUE(opCtx->lockState()->shouldAcquireTicket()); + ASSERT_TRUE(opCtx->lockState()->shouldWaitForTicket()); { SetTicketAquisitionPriorityForLock setTicketAquisition( opCtx.get(), AdmissionContext::Priority::kImmediate); - ASSERT_FALSE(opCtx->lockState()->shouldAcquireTicket()); + ASSERT_FALSE(opCtx->lockState()->shouldWaitForTicket()); } - ASSERT_TRUE(opCtx->lockState()->shouldAcquireTicket()); + ASSERT_TRUE(opCtx->lockState()->shouldWaitForTicket()); // If ticket acquisitions are disabled on the lock state, the RAII type has no effect. opCtx->lockState()->setAdmissionPriority(AdmissionContext::Priority::kImmediate); - ASSERT_FALSE(opCtx->lockState()->shouldAcquireTicket()); + ASSERT_FALSE(opCtx->lockState()->shouldWaitForTicket()); { SetTicketAquisitionPriorityForLock setTicketAquisition( opCtx.get(), AdmissionContext::Priority::kImmediate); - ASSERT_FALSE(opCtx->lockState()->shouldAcquireTicket()); + ASSERT_FALSE(opCtx->lockState()->shouldWaitForTicket()); } - ASSERT_FALSE(opCtx->lockState()->shouldAcquireTicket()); + ASSERT_FALSE(opCtx->lockState()->shouldWaitForTicket()); } // This test exercises the lock dumping code in ~LockerImpl in case locks are held on destruction. diff --git a/src/mongo/db/concurrency/locker.h b/src/mongo/db/concurrency/locker.h index d29774e39a8..0f3e760b54d 100644 --- a/src/mongo/db/concurrency/locker.h +++ b/src/mongo/db/concurrency/locker.h @@ -521,7 +521,7 @@ public: return _admCtx.getPriority(); } - bool shouldAcquireTicket() const { + bool shouldWaitForTicket() const { return _admCtx.getPriority() != AdmissionContext::Priority::kImmediate; } diff --git a/src/mongo/util/concurrency/admission_context.h b/src/mongo/util/concurrency/admission_context.h index 304a57dc434..8eddb3cf657 100644 --- a/src/mongo/util/concurrency/admission_context.h +++ b/src/mongo/util/concurrency/admission_context.h @@ -57,13 +57,11 @@ public: * user and internal, should use this priority unless they qualify as 'kLow' or 'kImmediate' * priority. * - * 'kImmediate': It's crucial that the operation makes forward progress - bypassing ticket - * acquisition. Reserved for operations critical to availability (e.g. replication workers) or - * observability (e.g. FTDC), and any operation that is releasing resources (e.g. committing or - * aborting prepared transactions). Should be used sparingly. - * - * TODO SERVER-67951: Update comment to address that kImmediate priority operations are always - * granted a ticket immediately upon request. + * 'kImmediate': It's crucial that the operation makes forward progress - and acquire a ticket + * immediately upon request, without waiting. + * Reserved for operations critical to availability (e.g. replication workers) or observability + * (e.g. FTDC), and any operation that is releasing resources (e.g. committing or aborting + * prepared transactions). Should be used sparingly. */ enum class Priority { kLow, kNormal, kImmediate }; diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index b5663a65b54..d910f292129 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -41,12 +41,33 @@ #define MONGO_LOGV2_DEFAULT_COMPONENT ::mongo::logv2::LogComponent::kDefault +namespace mongo { + namespace { const auto ticketHolderDecoration = mongo::ServiceContext::declareDecoration<std::unique_ptr<mongo::TicketHolder>>(); + +void updateQueueStatsOnRelease(ServiceContext* serviceContext, + TicketHolderWithQueueingStats::QueueStats& queueStats, + AdmissionContext* admCtx) { + queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1); + auto startTime = admCtx->getStartProcessingTime(); + auto tickSource = serviceContext->getTickSource(); + auto delta = tickSource->spanTo<Microseconds>(startTime, tickSource->getTicks()); + queueStats.totalTimeProcessingMicros.fetchAndAddRelaxed(delta.count()); } -namespace mongo { +void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext, + TicketHolderWithQueueingStats::QueueStats& queueStats, + AdmissionContext* admCtx) { + if (admCtx->getAdmissions() == 0) { + queueStats.totalNewAdmissions.fetchAndAddRelaxed(1); + } + admCtx->start(serviceContext->getTickSource()); + queueStats.totalStartedProcessing.fetchAndAddRelaxed(1); +} + +} // namespace TicketHolder* TicketHolder::get(ServiceContext* svcCtx) { return ticketHolderDecoration(svcCtx).get(); @@ -58,6 +79,21 @@ void TicketHolder::use(ServiceContext* svcCtx, std::unique_ptr<TicketHolder> new ReaderWriterTicketHolder::~ReaderWriterTicketHolder(){}; +Ticket ReaderWriterTicketHolder::acquireImmediateTicket(AdmissionContext* admCtx) { + switch (admCtx->getLockMode()) { + case MODE_IS: + case MODE_S: + return _reader->acquireImmediateTicket(admCtx); + case MODE_IX: + return _writer->acquireImmediateTicket(admCtx); + default: + // Tickets are linked to the GlobalLock and a MODE_X lock is already exclusive - all + // other operations waiting for MODE_X will be blocked so no need to go through the + // ticketing mechanism. + MONGO_UNREACHABLE; + } +} + boost::optional<Ticket> ReaderWriterTicketHolder::tryAcquire(AdmissionContext* admCtx) { switch (admCtx->getLockMode()) { @@ -115,13 +151,25 @@ void ReaderWriterTicketHolder::appendStats(BSONObjBuilder& b) const { } } -void ReaderWriterTicketHolder::_release(AdmissionContext* admCtx) noexcept { +void ReaderWriterTicketHolder::_releaseImmediateTicket(AdmissionContext* admCtx) noexcept { switch (admCtx->getLockMode()) { case MODE_IS: case MODE_S: - return _reader->_release(admCtx); + return _reader->_releaseImmediateTicket(admCtx); case MODE_IX: - return _writer->_release(admCtx); + return _writer->_releaseImmediateTicket(admCtx); + default: + MONGO_UNREACHABLE; + } +} + +void ReaderWriterTicketHolder::_releaseToTicketPool(AdmissionContext* admCtx) noexcept { + switch (admCtx->getLockMode()) { + case MODE_IS: + case MODE_S: + return _reader->_releaseToTicketPool(admCtx); + case MODE_IX: + return _writer->_releaseToTicketPool(admCtx); default: MONGO_UNREACHABLE; } @@ -135,6 +183,15 @@ void ReaderWriterTicketHolder::resizeWriters(int newSize) { return _writer->resize(newSize); } +Ticket TicketHolderWithQueueingStats::acquireImmediateTicket(AdmissionContext* admCtx) { + invariant(admCtx->getPriority() == AdmissionContext::Priority::kImmediate); + if (recordImmediateTicketStatistics()) { + auto& queueStats = _getQueueStatsToUse(admCtx); + updateQueueStatsOnTicketAcquisition(_serviceContext, queueStats, admCtx); + } + return Ticket{this, admCtx}; +} + void TicketHolderWithQueueingStats::resize(int newSize) noexcept { stdx::lock_guard<Latch> lk(_resizeMutex); @@ -149,14 +206,17 @@ void TicketHolderWithQueueingStats::appendStats(BSONObjBuilder& b) const { _appendImplStats(b); } -void TicketHolderWithQueueingStats::_release(AdmissionContext* admCtx) noexcept { +void TicketHolderWithQueueingStats::_releaseImmediateTicket(AdmissionContext* admCtx) noexcept { + if (recordImmediateTicketStatistics()) { + auto& queueStats = _getQueueStatsToUse(admCtx); + updateQueueStatsOnRelease(_serviceContext, queueStats, admCtx); + } +} + +void TicketHolderWithQueueingStats::_releaseToTicketPool(AdmissionContext* admCtx) noexcept { auto& queueStats = _getQueueStatsToUse(admCtx); - queueStats.totalFinishedProcessing.fetchAndAddRelaxed(1); - auto startTime = admCtx->getStartProcessingTime(); - auto tickSource = _serviceContext->getTickSource(); - auto delta = tickSource->spanTo<Microseconds>(startTime, tickSource->getTicks()); - queueStats.totalTimeProcessingMicros.fetchAndAddRelaxed(delta.count()); - _releaseQueue(admCtx); + updateQueueStatsOnRelease(_serviceContext, queueStats, admCtx); + _releaseToTicketPoolImpl(admCtx); } Ticket TicketHolderWithQueueingStats::waitForTicket(OperationContext* opCtx, @@ -168,17 +228,14 @@ Ticket TicketHolderWithQueueingStats::waitForTicket(OperationContext* opCtx, } boost::optional<Ticket> TicketHolderWithQueueingStats::tryAcquire(AdmissionContext* admCtx) { - invariant(admCtx); - + // kImmediate operations don't need to 'try' to acquire a ticket, they should always get a + // ticket immediately. + invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); auto ticket = _tryAcquireImpl(admCtx); - // Track statistics. + if (ticket) { auto& queueStats = _getQueueStatsToUse(admCtx); - if (admCtx->getAdmissions() == 0) { - queueStats.totalNewAdmissions.fetchAndAddRelaxed(1); - } - admCtx->start(_serviceContext->getTickSource()); - queueStats.totalStartedProcessing.fetchAndAddRelaxed(1); + updateQueueStatsOnTicketAcquisition(_serviceContext, queueStats, admCtx); } return ticket; } @@ -218,11 +275,7 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat if (ticket) { cancelWait.dismiss(); - if (admCtx->getAdmissions() == 0) { - queueStats.totalNewAdmissions.fetchAndAddRelaxed(1); - } - admCtx->start(_serviceContext->getTickSource()); - queueStats.totalStartedProcessing.fetchAndAddRelaxed(1); + updateQueueStatsOnTicketAcquisition(_serviceContext, queueStats, admCtx); return ticket; } else { return boost::none; @@ -329,7 +382,7 @@ boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(Operation return Ticket{this, admCtx}; } -void SemaphoreTicketHolder::_releaseQueue(AdmissionContext* admCtx) noexcept { +void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { check(sem_post(&_sem)); } @@ -394,7 +447,7 @@ boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(Operation return Ticket{this, admCtx}; } -void SemaphoreTicketHolder::_releaseQueue(AdmissionContext* admCtx) noexcept { +void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { { stdx::lock_guard<Latch> lk(_mutex); _numTickets++; @@ -455,8 +508,11 @@ int SchedulingTicketHolder::queued() const { return _enqueuedElements.loadRelaxed(); } -void SchedulingTicketHolder::_releaseQueue(AdmissionContext* admCtx) noexcept { - invariant(admCtx); +void SchedulingTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { + // Tickets acquired with priority kImmediate are not generated from the pool of available + // tickets, and thus should never be returned to the pool of available tickets. + invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); + // The idea behind the release mechanism consists of a consistent view of queued elements // waiting for a ticket and many threads releasing tickets simultaneously. The releasers will // proceed to attempt to dequeue an element by seeing if there are threads not woken and waking @@ -598,10 +654,12 @@ bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx, } PriorityTicketHolder::PriorityTicketHolder(int numTickets, ServiceContext* serviceContext) - : SchedulingTicketHolder(numTickets, 2, serviceContext) {} + : SchedulingTicketHolder(numTickets, 3, serviceContext) {} void PriorityTicketHolder::_dequeueWaitingThread() { - int currentIndexQueue = static_cast<unsigned int>(QueueType::QueueTypeSize) - 1; + // There should never be anything to dequeue from 'QueueType::ImmediatePriorityNoOpQueue' since + // 'kImmediate' operations should always bypass the need to queue. + int currentIndexQueue = static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue) - 1; while (!_queues[currentIndexQueue].attemptToDequeue()) { if (currentIndexQueue == 0) break; @@ -613,16 +671,33 @@ void PriorityTicketHolder::_dequeueWaitingThread() { void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { { BSONObjBuilder bbb(b.subobjStart("lowPriority")); - auto& queueStats = + auto& lowPriorityTicketStats = _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)].getStats(); - _appendPriorityStats(bbb, queueStats); + _appendPriorityStats(bbb, lowPriorityTicketStats); bbb.done(); } { BSONObjBuilder bbb(b.subobjStart("normalPriority")); - auto& queueStats = + auto& normalPriorityTicketStats = _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)].getStats(); - _appendPriorityStats(bbb, queueStats); + _appendPriorityStats(bbb, normalPriorityTicketStats); + bbb.done(); + } + { + BSONObjBuilder bbb(b.subobjStart("immediatePriority")); + // Since 'kImmediate' priority operations will never queue, omit queueing statistics that + // will always be 0. + auto& immediateTicketStats = + _queues[static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue)].getStats(); + + auto finished = immediateTicketStats.totalFinishedProcessing.loadRelaxed(); + auto started = immediateTicketStats.totalStartedProcessing.loadRelaxed(); + bbb.append("startedProcessing", started); + bbb.append("processing", std::max(static_cast<int>(started - finished), 0)); + bbb.append("finishedProcessing", finished); + bbb.append("totalTimeProcessingMicros", + immediateTicketStats.totalTimeProcessingMicros.loadRelaxed()); + bbb.append("newAdmissions", immediateTicketStats.totalNewAdmissions.loadRelaxed()); bbb.done(); } } @@ -654,9 +729,11 @@ SchedulingTicketHolder::Queue& PriorityTicketHolder::_getQueueToUse( return _queues[static_cast<unsigned int>(QueueType::LowPriorityQueue)]; case AdmissionContext::Priority::kNormal: return _queues[static_cast<unsigned int>(QueueType::NormalPriorityQueue)]; - default: - MONGO_UNREACHABLE; + case AdmissionContext::Priority::kImmediate: + return _queues[static_cast<unsigned int>(QueueType::ImmediatePriorityNoOpQueue)]; } + + MONGO_UNREACHABLE; } TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse( diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index e9846e0173f..60f5d167769 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -71,6 +71,12 @@ public: static void use(ServiceContext* svcCtx, std::unique_ptr<TicketHolder> newTicketHolder); /** + * Immediately returns a ticket without impacting the number of tickets available. Reserved for + * operations that should never be throttled by the ticketing mechanism. + */ + virtual Ticket acquireImmediateTicket(AdmissionContext* admCtx) = 0; + + /** * Attempts to acquire a ticket without blocking. * Returns a boolean indicating whether the operation was successful or not. */ @@ -98,7 +104,17 @@ public: virtual void appendStats(BSONObjBuilder& b) const = 0; private: - virtual void _release(AdmissionContext* admCtx) noexcept = 0; + /** + * Restricted for releasing tickets acquired via "acquireImmediateTicket". Handles the release + * of an immediate ticket, which should never be reused or returned to the ticketing pool of + * available tickets. + */ + virtual void _releaseImmediateTicket(AdmissionContext* admCtx) noexcept = 0; + + /** + * Releases a ticket back into the ticketing pool. + */ + virtual void _releaseToTicketPool(AdmissionContext* admCtx) noexcept = 0; }; /** @@ -108,56 +124,40 @@ class TicketHolderWithQueueingStats : public TicketHolder { friend class ReaderWriterTicketHolder; public: - /** - * Wait mode for ticket acquisition: interruptible or uninterruptible. - */ TicketHolderWithQueueingStats(int numTickets, ServiceContext* svcCtx) : _outof(numTickets), _serviceContext(svcCtx){}; ~TicketHolderWithQueueingStats() override{}; - /** - * Attempts to acquire a ticket without blocking. - * Returns a boolean indicating whether the operation was successful or not. - */ + Ticket acquireImmediateTicket(AdmissionContext* admCtx) override final; + boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) override; - /** - * Attempts to acquire a ticket. Blocks until a ticket is acquired or the OperationContext - * 'opCtx' is killed, throwing an AssertionException. - */ Ticket waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx, TicketHolder::WaitMode waitMode) override; - /** - * Attempts to acquire a ticket within a deadline, 'until'. Returns 'true' if a ticket is - * acquired and 'false' if the deadline is reached, but the operation is retryable. Throws an - * AssertionException if the OperationContext 'opCtx' is killed and no waits for tickets can - * proceed. - */ boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx, AdmissionContext* admCtx, Date_t until, TicketHolder::WaitMode waitMode) override; + /** + * Adjusts the total number of tickets allocated for the ticket pool to 'newSize'. + */ void resize(int newSize) noexcept; - virtual int available() const = 0; - virtual int used() const { return outof() - available(); } + /** + * The total number of tickets allotted to the ticket pool. + */ int outof() const { return _outof.loadRelaxed(); } - /** - * Returns the total number of operations queued - regardles of queueing policy. - */ - virtual int queued() const = 0; - void appendStats(BSONObjBuilder& b) const override; /** @@ -176,7 +176,32 @@ public: AtomicWord<std::int64_t> totalTimeQueuedMicros{0}; }; + /** + * Instantaneous number of operations waiting in queue for a ticket. + */ + virtual int queued() const = 0; + + /** + * Instantaneous number of tickets 'available' (not checked out by an operation) in the ticket + * pool. + */ + virtual int available() const = 0; + + /** + * 'Immediate' tickets are acquired and released independent of the ticketing pool and queueing + * system. Subclasses must define whether they wish to record statistics surrounding 'immediate' + * tickets in addition to standard queueing statistics. + * + * Returns true if statistics surrounding 'immediate' tickets are to be tracked. False + * otherwise. + */ + virtual bool recordImmediateTicketStatistics() = 0; + private: + void _releaseImmediateTicket(AdmissionContext* admCtx) noexcept final; + + void _releaseToTicketPool(AdmissionContext* admCtx) noexcept override final; + virtual boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) = 0; virtual boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, @@ -186,9 +211,7 @@ private: virtual void _appendImplStats(BSONObjBuilder& b) const = 0; - void _release(AdmissionContext* admCtx) noexcept override; - - virtual void _releaseQueue(AdmissionContext* admCtx) noexcept = 0; + virtual void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept = 0; virtual void _resize(int newSize, int oldSize) noexcept = 0; @@ -220,26 +243,14 @@ public: ~ReaderWriterTicketHolder() override final; - /** - * Attempts to acquire a ticket without blocking. - * Returns a boolean indicating whether the operation was successful or not. - */ + Ticket acquireImmediateTicket(AdmissionContext* admCtx) override final; + boost::optional<Ticket> tryAcquire(AdmissionContext* admCtx) override final; - /** - * Attempts to acquire a ticket. Blocks until a ticket is acquired or the OperationContext - * 'opCtx' is killed, throwing an AssertionException. - */ Ticket waitForTicket(OperationContext* opCtx, AdmissionContext* admCtx, WaitMode waitMode) override final; - /** - * Attempts to acquire a ticket within a deadline, 'until'. Returns 'true' if a ticket is - * acquired and 'false' if the deadline is reached, but the operation is retryable. Throws an - * AssertionException if the OperationContext 'opCtx' is killed and no waits for tickets can - * proceed. - */ boost::optional<Ticket> waitForTicketUntil(OperationContext* opCtx, AdmissionContext* admCtx, Date_t until, @@ -251,7 +262,8 @@ public: void resizeWriters(int newSize); private: - void _release(AdmissionContext* admCtx) noexcept override final; + void _releaseImmediateTicket(AdmissionContext* admCtx) noexcept final; + void _releaseToTicketPool(AdmissionContext* admCtx) noexcept override final; std::unique_ptr<TicketHolderWithQueueingStats> _reader; std::unique_ptr<TicketHolderWithQueueingStats> _writer; @@ -270,6 +282,13 @@ public: return std::max(static_cast<int>(added - removed), 0); }; + bool recordImmediateTicketStatistics() noexcept override final { + // Historically, operations that now acquire 'immediate' tickets bypassed the ticketing + // mechanism completely. Preserve legacy behavior where 'immediate' ticketing is not tracked + // in the statistics. + return false; + } + private: boost::optional<Ticket> _waitForTicketUntilImpl(OperationContext* opCtx, AdmissionContext* admCtx, @@ -277,7 +296,7 @@ private: WaitMode waitMode) override final; boost::optional<Ticket> _tryAcquireImpl(AdmissionContext* admCtx) override final; - void _releaseQueue(AdmissionContext* admCtx) noexcept override final; + void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final; void _appendImplStats(BSONObjBuilder& b) const override final; @@ -375,6 +394,10 @@ public: int queued() const override final; + bool recordImmediateTicketStatistics() noexcept override final { + return true; + }; + private: bool _tryAcquireTicket(); @@ -385,7 +408,7 @@ private: Date_t until, WaitMode waitMode) override final; - void _releaseQueue(AdmissionContext* admCtx) noexcept override final; + void _releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept override final; void _resize(int newSize, int oldSize) noexcept override final; @@ -426,7 +449,10 @@ private: enum class QueueType : unsigned int { LowPriorityQueue = 0, NormalPriorityQueue = 1, - QueueTypeSize = 2 + // Exclusively used for statistics tracking. This queue should never have any processes + // 'queued'. + ImmediatePriorityNoOpQueue = 2, + QueueTypeSize = 3 }; void _dequeueWaitingThread() override final; @@ -469,7 +495,11 @@ public: ~Ticket() { if (_ticketholder) { - _ticketholder->_release(_admissionContext); + if (_admissionContext->getPriority() == AdmissionContext::Priority::kImmediate) { + _ticketholder->_releaseImmediateTicket(_admissionContext); + } else { + _ticketholder->_releaseToTicketPool(_admissionContext); + } } } @@ -499,5 +529,4 @@ private: TicketHolder* _ticketholder; AdmissionContext* _admissionContext; }; - } // namespace mongo diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp index 779b0777991..7651a66b7f2 100644 --- a/src/mongo/util/concurrency/ticketholder_test.cpp +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -62,6 +62,27 @@ protected: ServiceContext::UniqueOperationContext _opCtx; }; +static inline const Seconds kWaitTimeout{2}; +static inline const Milliseconds kSleepTime{1}; + +/** + * Asserts that eventually the predicate does not throw an exception. + */ +void assertSoon(std::function<void()> predicate, Milliseconds timeout = kWaitTimeout) { + Timer t; + while (true) { + try { + predicate(); + break; + } catch (...) { + if (t.elapsed() >= timeout) { + throw; + } + sleepFor(kSleepTime); + } + } +} + template <class H> void basicTimeout(OperationContext* opCtx) { ServiceContext serviceContext; @@ -185,7 +206,7 @@ struct MockAdmission { }; template <class H> -void resizeTest(OperationContext* opCtx) { +void resizeTest(OperationContext* opCtx, bool testWithOutstandingImmediateOperation = false) { // Verify that resize operations don't alter metrics outside of those linked to the number of // tickets. ServiceContext serviceContext; @@ -195,6 +216,16 @@ void resizeTest(OperationContext* opCtx) { std::unique_ptr<TicketHolderWithQueueingStats> holder = std::make_unique<H>(1, &serviceContext); Stats stats(holder.get()); + // An outstanding kImmediate priority operation should not impact resize statistics. + MockAdmission immediatePriorityAdmission("immediatePriorityAdmission", + getGlobalServiceContext(), + AdmissionContext::Priority::kImmediate); + if (testWithOutstandingImmediateOperation) { + immediatePriorityAdmission.ticket = + holder->acquireImmediateTicket(&immediatePriorityAdmission.admCtx); + ASSERT(immediatePriorityAdmission.ticket); + } + AdmissionContext admCtx; admCtx.setPriority(AdmissionContext::Priority::kNormal); @@ -242,6 +273,12 @@ TEST_F(TicketHolderTest, ResizeStatsSemaphore) { TEST_F(TicketHolderTest, ResizeStatsPriority) { resizeTest<PriorityTicketHolder>(_opCtx.get()); } +TEST_F(TicketHolderTest, ResizeStatsSemaphoreWithOutstandingImmediatePriority) { + resizeTest<SemaphoreTicketHolder>(_opCtx.get(), true); +} +TEST_F(TicketHolderTest, ResizeStatsPriorityWithOutstandingImmediatePriority) { + resizeTest<PriorityTicketHolder>(_opCtx.get(), true); +} TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) { ServiceContext serviceContext; @@ -318,6 +355,13 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) { ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 1); ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1); ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0); + + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0); } TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { @@ -414,6 +458,13 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { ASSERT_EQ(normalPriorityStats.getIntField("startedProcessing"), 2); ASSERT_EQ(normalPriorityStats.getIntField("processing"), 0); ASSERT_EQ(normalPriorityStats.getIntField("finishedProcessing"), 2); + + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0); } TEST_F(TicketHolderTest, PriorityBasicMetrics) { @@ -454,23 +505,9 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) { // Test that the metrics eventually converge to the following set of values. There can be // cases where the values are incorrect for brief periods of time due to optimistic // concurrency. - auto deadline = Date_t::now() + Milliseconds{100}; - while (true) { - try { - // ASSERT_EQ(stats["out"], 1); - ASSERT_EQ(stats["available"], 0); - // ASSERT_EQ(stats["addedToQueue"], 1); - // ASSERT_EQ(stats["queueLength"], 1); - break; - } catch (...) { - if (Date_t::now() > deadline) { - throw; - } - // Sleep to allow other threads to process and converge the metrics. - stdx::this_thread::sleep_for(Milliseconds{1}.toSystemDuration()); - } - } + assertSoon([&] { ASSERT_EQ(stats["available"], 0); }); } + tickSource->advance(Microseconds(100)); lowPriorityAdmission.ticket.reset(); @@ -514,6 +551,13 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) { ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1); ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 0); + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0); + // Retake ticket. holder.waitForTicket( _opCtx.get(), &lowPriorityAdmission.admCtx, TicketHolder::WaitMode::kInterruptible); @@ -528,6 +572,90 @@ TEST_F(TicketHolderTest, PriorityBasicMetrics) { ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 1); } +TEST_F(TicketHolderTest, PrioritImmediateMetrics) { + ServiceContext serviceContext; + serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); + auto tickSource = dynamic_cast<TickSourceMock<Microseconds>*>(serviceContext.getTickSource()); + PriorityTicketHolder holder(1, &serviceContext); + Stats stats(&holder); + + MockAdmission lowPriorityAdmission( + "lowPriorityAdmission", this->getServiceContext(), AdmissionContext::Priority::kLow); + lowPriorityAdmission.ticket = holder.waitForTicket(lowPriorityAdmission.opCtx.get(), + &lowPriorityAdmission.admCtx, + TicketHolder::WaitMode::kInterruptible); + ASSERT(lowPriorityAdmission.ticket); + { + // Test that the metrics eventually converge to the following set of values. There can be + // cases where the values are incorrect for brief periods of time due to optimistic + // concurrency. + assertSoon([&] { + ASSERT_EQ(stats["available"], 0); + ASSERT_EQ(stats["out"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + }); + } + + MockAdmission immediatePriorityAdmission("immediatePriorityAdmission", + this->getServiceContext(), + AdmissionContext::Priority::kImmediate); + immediatePriorityAdmission.ticket = + holder.acquireImmediateTicket(&immediatePriorityAdmission.admCtx); + ASSERT(immediatePriorityAdmission.ticket); + + { + // Test that the metrics eventually converge to the following set of values. There can be + // cases where the values are incorrect for brief periods of time due to optimistic + // concurrency. + assertSoon([&]() { + // only reported in the priority specific statistics. + ASSERT_EQ(stats["available"], 0); + ASSERT_EQ(stats["out"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + + auto currentStats = stats.getStats(); + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 1); + + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 1); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + }); + } + + lowPriorityAdmission.ticket.reset(); + + tickSource->advance(Microseconds(200)); + + assertSoon([&] { + ASSERT_EQ(stats["out"], 0); + ASSERT_EQ(stats["available"], 1); + ASSERT_EQ(stats["totalTickets"], 1); + }); + + immediatePriorityAdmission.ticket.reset(); + + auto currentStats = stats.getStats(); + auto lowPriorityStats = currentStats.getObjectField("lowPriority"); + ASSERT_EQ(lowPriorityStats.getIntField("addedToQueue"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("removedFromQueue"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("queueLength"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("processing"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("finishedProcessing"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("totalTimeQueuedMicros"), 0); + ASSERT_EQ(lowPriorityStats.getIntField("newAdmissions"), 1); + ASSERT_EQ(lowPriorityStats.getIntField("canceled"), 0); + + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 1); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 1); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 200); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 1); +} + TEST_F(TicketHolderTest, PriorityCanceled) { ServiceContext serviceContext; serviceContext.setTickSource(std::make_unique<TickSourceMock<Microseconds>>()); @@ -589,5 +717,12 @@ TEST_F(TicketHolderTest, PriorityCanceled) { ASSERT_EQ(normalPriorityStats.getIntField("totalTimeQueuedMicros"), 100); ASSERT_EQ(normalPriorityStats.getIntField("newAdmissions"), 0); ASSERT_EQ(normalPriorityStats.getIntField("canceled"), 1); + + auto immediatePriorityStats = currentStats.getObjectField("immediatePriority"); + ASSERT_EQ(immediatePriorityStats.getIntField("startedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("processing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("finishedProcessing"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("totalTimeProcessingMicros"), 0); + ASSERT_EQ(immediatePriorityStats.getIntField("newAdmissions"), 0); } } // namespace |