diff options
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.cpp')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 424 |
1 files changed, 16 insertions, 408 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index eafed4da466..33e8c05b2c5 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -63,29 +63,6 @@ void updateQueueStatsOnTicketAcquisition(ServiceContext* serviceContext, admCtx->start(serviceContext->getTickSource()); queueStats.totalStartedProcessing.fetchAndAddRelaxed(1); } - -/** - * Appends the standard statistics stored in QueueStats to BSONObjBuilder b; - */ -void appendCommonQueueImplStats(BSONObjBuilder& b, - const TicketHolderWithQueueingStats::QueueStats& stats) { - auto removed = stats.totalRemovedQueue.loadRelaxed(); - auto added = stats.totalAddedQueue.loadRelaxed(); - - b.append("addedToQueue", added); - b.append("removedFromQueue", removed); - b.append("queueLength", std::max(static_cast<int>(added - removed), 0)); - - auto finished = stats.totalFinishedProcessing.loadRelaxed(); - auto started = stats.totalStartedProcessing.loadRelaxed(); - b.append("startedProcessing", started); - b.append("processing", std::max(static_cast<int>(started - finished), 0)); - b.append("finishedProcessing", finished); - b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed()); - b.append("canceled", stats.totalCanceled.loadRelaxed()); - b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed()); - b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed()); -} } // namespace Ticket TicketHolderWithQueueingStats::acquireImmediateTicket(AdmissionContext* admCtx) { @@ -187,393 +164,24 @@ boost::optional<Ticket> TicketHolderWithQueueingStats::waitForTicketUntil(Operat } } -void SemaphoreTicketHolder::_appendImplStats(BSONObjBuilder& b) const { - appendCommonQueueImplStats(b, _semaphoreStats); -} -#if defined(__linux__) -namespace { - -/** - * Accepts an errno code, prints its error message, and exits. - */ -void failWithErrno(int err) { - LOGV2_FATAL(28604, - "error in Ticketholder: {errnoWithDescription_err}", - "errnoWithDescription_err"_attr = errorMessage(posixError(err))); -} - -/* - * Checks the return value from a Linux semaphore function call, and fails with the set errno if the - * call was unsucessful. - */ -void check(int ret) { - if (ret == 0) - return; - failWithErrno(errno); -} - -/** - * Takes a Date_t deadline and sets the appropriate values in a timespec structure. - */ -void tsFromDate(const Date_t& deadline, struct timespec& ts) { - ts.tv_sec = deadline.toTimeT(); - ts.tv_nsec = (deadline.toMillisSinceEpoch() % 1000) * 1'000'000; -} -} // namespace - -SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* serviceContext) - : TicketHolderWithQueueingStats(numTickets, serviceContext) { - check(sem_init(&_sem, 0, numTickets)); -} - -SemaphoreTicketHolder::~SemaphoreTicketHolder() { - check(sem_destroy(&_sem)); -} - -boost::optional<Ticket> SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { - while (0 != sem_trywait(&_sem)) { - if (errno == EAGAIN) - return boost::none; - if (errno != EINTR) - failWithErrno(errno); - } - return Ticket{this, admCtx}; -} - -boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) { - const Milliseconds intervalMs(500); - struct timespec ts; - - // To support interrupting ticket acquisition while still benefiting from semaphores, we do a - // timed wait on an interval to periodically check for interrupts. - // The wait period interval is the smaller of the default interval and the provided - // deadline. - Date_t deadline = std::min(until, Date_t::now() + intervalMs); - tsFromDate(deadline, ts); - - while (0 != sem_timedwait(&_sem, &ts)) { - if (errno == ETIMEDOUT) { - // If we reached the deadline without being interrupted, we have completely timed out. - if (deadline == until) - return boost::none; - - deadline = std::min(until, Date_t::now() + intervalMs); - tsFromDate(deadline, ts); - } else if (errno != EINTR) { - failWithErrno(errno); - } - - // To correctly handle errors from sem_timedwait, we should check for interrupts last. - // It is possible to unset 'errno' after a call to checkForInterrupt(). - if (waitMode == WaitMode::kInterruptible) - opCtx->checkForInterrupt(); - } - return Ticket{this, admCtx}; -} - -void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { - check(sem_post(&_sem)); -} - -int SemaphoreTicketHolder::available() const { - int val = 0; - check(sem_getvalue(&_sem, &val)); - return val; -} - -void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { - auto difference = newSize - oldSize; - - if (difference > 0) { - for (int i = 0; i < difference; i++) { - check(sem_post(&_sem)); - } - } else if (difference < 0) { - for (int i = 0; i < -difference; i++) { - check(sem_wait(&_sem)); - } - } -} - -#else - -SemaphoreTicketHolder::SemaphoreTicketHolder(int numTickets, ServiceContext* svcCtx) - : TicketHolderWithQueueingStats(numTickets, svcCtx), _numTickets(numTickets) {} - -SemaphoreTicketHolder::~SemaphoreTicketHolder() = default; - -boost::optional<Ticket> SemaphoreTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { - stdx::lock_guard<Latch> lk(_mutex); - if (!_tryAcquire()) { - return boost::none; - } - return Ticket{this, admCtx}; -} - -boost::optional<Ticket> SemaphoreTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) { - stdx::unique_lock<Latch> lk(_mutex); - - bool taken = [&] { - if (waitMode == WaitMode::kInterruptible) { - return opCtx->waitForConditionOrInterruptUntil( - _newTicket, lk, until, [this] { return _tryAcquire(); }); - } else { - if (until == Date_t::max()) { - _newTicket.wait(lk, [this] { return _tryAcquire(); }); - return true; - } else { - return _newTicket.wait_until( - lk, until.toSystemTimePoint(), [this] { return _tryAcquire(); }); - } - } - }(); - if (!taken) { - return boost::none; - } - return Ticket{this, admCtx}; -} - -void SemaphoreTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { - { - stdx::lock_guard<Latch> lk(_mutex); - _numTickets++; - } - _newTicket.notify_one(); -} - -int SemaphoreTicketHolder::available() const { - return _numTickets; -} - -bool SemaphoreTicketHolder::_tryAcquire() { - if (_numTickets <= 0) { - if (_numTickets < 0) { - std::cerr << "DISASTER! in TicketHolder" << std::endl; - } - return false; - } - _numTickets--; - return true; -} - -void SemaphoreTicketHolder::_resize(int newSize, int oldSize) noexcept { - auto difference = newSize - oldSize; - - stdx::lock_guard<Latch> lk(_mutex); - _numTickets += difference; - - if (difference > 0) { - for (int i = 0; i < difference; i++) { - _newTicket.notify_one(); - } - } - // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll - // have to wait until the current ticket holders release their tickets. -} -#endif - -PriorityTicketHolder::PriorityTicketHolder(int numTickets, - int lowPriorityBypassThreshold, - ServiceContext* serviceContext) - : TicketHolderWithQueueingStats(numTickets, serviceContext), - _lowPriorityBypassThreshold(lowPriorityBypassThreshold), - _serviceContext(serviceContext) { - _ticketsAvailable.store(numTickets); - _enqueuedElements.store(0); -} - -void PriorityTicketHolder::updateLowPriorityAdmissionBypassThreshold( - const int& newBypassThreshold) { - ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex); - _lowPriorityBypassThreshold = newBypassThreshold; -} - -boost::optional<Ticket> PriorityTicketHolder::_tryAcquireImpl(AdmissionContext* admCtx) { - invariant(admCtx); - // Low priority operations cannot use optimistic ticket acquisition and will go to the queue - // instead. This is done to prevent them from skipping the line before other high-priority - // operations. - if (admCtx->getPriority() >= AdmissionContext::Priority::kNormal) { - auto hasAcquired = _tryAcquireTicket(); - if (hasAcquired) { - return Ticket{this, admCtx}; - } - } - return boost::none; -} - -boost::optional<Ticket> PriorityTicketHolder::_waitForTicketUntilImpl(OperationContext* opCtx, - AdmissionContext* admCtx, - Date_t until, - WaitMode waitMode) { - invariant(admCtx); - - auto queueType = _getQueueType(admCtx); - auto& queue = _getQueue(queueType); - - bool interruptible = waitMode == WaitMode::kInterruptible; - - _enqueuedElements.addAndFetch(1); - ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); }); - - ticket_queues::UniqueLockGuard uniqueQueueLock(_queueMutex); - do { - while (_ticketsAvailable.load() <= 0 || - _hasToWaitForHigherPriority(uniqueQueueLock, queueType)) { - bool hasTimedOut = !queue.enqueue(uniqueQueueLock, opCtx, until, interruptible); - if (hasTimedOut) { - return boost::none; - } - } - } while (!_tryAcquireTicket()); - return Ticket{this, admCtx}; -} - -void PriorityTicketHolder::_releaseToTicketPoolImpl(AdmissionContext* admCtx) noexcept { - // Tickets acquired with priority kImmediate are not generated from the pool of available - // tickets, and thus should never be returned to the pool of available tickets. - invariant(admCtx && admCtx->getPriority() != AdmissionContext::Priority::kImmediate); - - // The idea behind the release mechanism consists of a consistent view of queued elements - // waiting for a ticket and many threads releasing tickets simultaneously. The releasers will - // proceed to attempt to dequeue an element by seeing if there are threads not woken and waking - // one, having increased the number of woken threads for accuracy. Once the thread gets woken it - // will then decrease the number of woken threads (as it has been woken) and then attempt to - // acquire a ticket. The two possible states are either one or more releasers releasing or a - // thread waking up due to the RW mutex. - // - // Under this lock the queues cannot be modified in terms of someone attempting to enqueue on - // them, only waking threads is allowed. - ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex); - _ticketsAvailable.addAndFetch(1); - _dequeueWaitingThread(sharedQueueLock); -} - -void PriorityTicketHolder::_resize(int newSize, int oldSize) noexcept { - auto difference = newSize - oldSize; - - _ticketsAvailable.fetchAndAdd(difference); - - if (difference > 0) { - // As we're adding tickets the waiting threads need to be notified that there are new - // tickets available. - ticket_queues::SharedLockGuard sharedQueueLock(_queueMutex); - for (int i = 0; i < difference; i++) { - _dequeueWaitingThread(sharedQueueLock); - } - } - - // No need to do anything in the other cases as the number of tickets being <= 0 implies they'll - // have to wait until the current ticket holders release their tickets. -} - -TicketHolderWithQueueingStats::QueueStats& PriorityTicketHolder::_getQueueStatsToUse( - const AdmissionContext* admCtx) noexcept { - auto queueType = _getQueueType(admCtx); - return _stats[_enumToInt(queueType)]; -} - -void PriorityTicketHolder::_appendImplStats(BSONObjBuilder& b) const { - { - BSONObjBuilder bbb(b.subobjStart("lowPriority")); - const auto& lowPriorityTicketStats = _stats[_enumToInt(QueueType::kLowPriority)]; - appendCommonQueueImplStats(bbb, lowPriorityTicketStats); - bbb.append("expedited", expedited()); - bbb.append("bypassed", bypassed()); - bbb.done(); - } - { - BSONObjBuilder bbb(b.subobjStart("normalPriority")); - const auto& normalPriorityTicketStats = _stats[_enumToInt(QueueType::kNormalPriority)]; - appendCommonQueueImplStats(bbb, normalPriorityTicketStats); - bbb.done(); - } - { - BSONObjBuilder bbb(b.subobjStart("immediatePriority")); - // Since 'kImmediate' priority operations will never queue, omit queueing statistics that - // will always be 0. - const auto& immediateTicketStats = _stats[_enumToInt(QueueType::kImmediatePriority)]; - - auto finished = immediateTicketStats.totalFinishedProcessing.loadRelaxed(); - auto started = immediateTicketStats.totalStartedProcessing.loadRelaxed(); - bbb.append("startedProcessing", started); - bbb.append("processing", std::max(static_cast<int>(started - finished), 0)); - bbb.append("finishedProcessing", finished); - bbb.append("totalTimeProcessingMicros", - immediateTicketStats.totalTimeProcessingMicros.loadRelaxed()); - bbb.append("newAdmissions", immediateTicketStats.totalNewAdmissions.loadRelaxed()); - bbb.done(); - } -} - -bool PriorityTicketHolder::_tryAcquireTicket() { - auto remaining = _ticketsAvailable.subtractAndFetch(1); - if (remaining < 0) { - _ticketsAvailable.addAndFetch(1); - return false; - } - return true; -} - -void PriorityTicketHolder::_dequeueWaitingThread( - const ticket_queues::SharedLockGuard& sharedQueueLock) { - // There are only 2 possible queues to dequeue from - the low priority and normal priority - // queues. There will never be anything to dequeue from the immediate priority queue, given - // immediate priority operations will never wait for ticket admission. - auto& lowPriorityQueue = _getQueue(QueueType::kLowPriority); - auto& normalPriorityQueue = _getQueue(QueueType::kNormalPriority); - - // There is a guarantee that the number of queued elements cannot change while holding the - // shared queue lock. - auto lowQueueCount = lowPriorityQueue.queuedElems(); - auto normalQueueCount = normalPriorityQueue.queuedElems(); - - if (lowQueueCount == 0 && normalQueueCount == 0) { - return; - } - if (lowQueueCount == 0) { - normalPriorityQueue.attemptToDequeue(sharedQueueLock); - return; - } - if (normalQueueCount == 0) { - lowPriorityQueue.attemptToDequeue(sharedQueueLock); - return; - } - - // Both queues are non-empty, and the low priority queue is bypassed for dequeue in favor of the - // normal priority queue until the bypass threshold is met. - if (_lowPriorityBypassThreshold > 0 && - _lowPriorityBypassCount.addAndFetch(1) % _lowPriorityBypassThreshold == 0) { - if (lowPriorityQueue.attemptToDequeue(sharedQueueLock)) { - _expeditedLowPriorityAdmissions.addAndFetch(1); - } else { - normalPriorityQueue.attemptToDequeue(sharedQueueLock); - } - return; - } +void TicketHolderWithQueueingStats::_appendCommonQueueImplStats(BSONObjBuilder& b, + const QueueStats& stats) const { + auto removed = stats.totalRemovedQueue.loadRelaxed(); + auto added = stats.totalAddedQueue.loadRelaxed(); - if (!normalPriorityQueue.attemptToDequeue(sharedQueueLock)) { - lowPriorityQueue.attemptToDequeue(sharedQueueLock); - } -} + b.append("addedToQueue", added); + b.append("removedFromQueue", removed); + b.append("queueLength", std::max(static_cast<int>(added - removed), 0)); -bool PriorityTicketHolder::_hasToWaitForHigherPriority(const ticket_queues::UniqueLockGuard& lk, - QueueType queue) { - switch (queue) { - case QueueType::kLowPriority: { - const auto& normalQueue = _getQueue(QueueType::kNormalPriority); - auto pending = normalQueue.getThreadsPendingToWake(); - return pending != 0 && pending >= _ticketsAvailable.load(); - } - default: - return false; - } + auto finished = stats.totalFinishedProcessing.loadRelaxed(); + auto started = stats.totalStartedProcessing.loadRelaxed(); + b.append("startedProcessing", started); + b.append("processing", std::max(static_cast<int>(started - finished), 0)); + b.append("finishedProcessing", finished); + b.append("totalTimeProcessingMicros", stats.totalTimeProcessingMicros.loadRelaxed()); + b.append("canceled", stats.totalCanceled.loadRelaxed()); + b.append("newAdmissions", stats.totalNewAdmissions.loadRelaxed()); + b.append("totalTimeQueuedMicros", stats.totalTimeQueuedMicros.loadRelaxed()); } } // namespace mongo |