diff options
author | Jordi Olivares Provencio <jordi.olivares-provencio@mongodb.com> | 2022-08-25 09:53:17 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-08-25 10:31:44 +0000 |
commit | b887165489cd44f581946b341a8941d2d27ddfec (patch) | |
tree | fd14e95e94346fb5e7be14401b5797d45304348f /src/mongo/util/concurrency/ticketholder.cpp | |
parent | 1b12c649156f62b2816aae2d45fac4a35f90eb60 (diff) | |
download | mongo-b887165489cd44f581946b341a8941d2d27ddfec.tar.gz |
SERVER-68854 Refactor SchedulingTicketHolder queueing logic
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.cpp')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 59 |
1 files changed, 24 insertions, 35 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index af9198dd243..d9bab22a47b 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -637,9 +637,6 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio WaitMode waitMode) { invariant(admCtx); - auto interruptible = - waitMode == WaitMode::kInterruptible ? opCtx : Interruptible::notInterruptible(); - auto& queue = _getQueueToUse(opCtx, admCtx); bool assigned; @@ -647,7 +644,7 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio stdx::unique_lock lk(_queueMutex); _enqueuedElements.addAndFetch(1); ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); }); - assigned = queue.enqueue(interruptible, lk, until); + assigned = queue.enqueue(opCtx, lk, until, waitMode); } if (assigned) { return Ticket{this, admCtx}; @@ -661,7 +658,7 @@ bool SchedulingTicketHolder::Queue::attemptToDequeue() { while (threadsToBeWoken < _queuedThreads) { auto canDequeue = _threadsToBeWoken.compareAndSwap(&threadsToBeWoken, threadsToBeWoken + 1); if (canDequeue) { - _queue.notify_one(); + _cv.notify_one(); return true; } } @@ -678,42 +675,34 @@ void SchedulingTicketHolder::Queue::_signalThreadWoken() { } } -bool SchedulingTicketHolder::Queue::enqueue(Interruptible* interruptible, +bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx, EnqueuerLockGuard& queueLock, - const Date_t& until) { + const Date_t& until, + WaitMode waitMode) { _queuedThreads++; // Before exiting we remove ourselves from the count of queued threads, we are still holding the // lock here so this is safe. ON_BLOCK_EXIT([&] { _queuedThreads--; }); - bool isFirstCheck = true; + + auto clockSource = opCtx->getServiceContext()->getPreciseClockSource(); + auto baton = waitMode == WaitMode::kInterruptible ? opCtx->getBaton().get() : nullptr; + do { - try { - interruptible->waitForConditionOrInterruptUntil(_queue, queueLock, until, [&] { - // As this block is executed when getting woken we must modify the woken count. - // Otherwise we are prone to deadlocking if we get woken and there are no tickets - // available, permanently signalling that a thread has been woken. - // - // We don't signal that a thread has been woken during the first predicate check as - // the underlying implementation does the following: - // - // while(!pred()) { - // cv.wait(lk); - // } - // - // Thus the predicate will always be called once before waiting. - if (isFirstCheck) { - isFirstCheck = false; - } else { - _signalThreadWoken(); - } - return _holder->_ticketsAvailable.load() > 0; - }); - } catch (...) { - _signalThreadWoken(); - throw; - } - if (Date_t::now() >= until) { - return false; + // We normally would use the opCtx->waitForConditionOrInterruptUntil method for doing this + // check. The problem is that we must call a method that signals that the thread has been + // woken after the condition variable wait, not before which is where the predicate would + // go. + while (_holder->_ticketsAvailable.load() <= 0) { + // This method must be called after getting woken in all cases, so we use a ScopeGuard + // to handle exceptions as well as early returns. + ON_BLOCK_EXIT([&] { _signalThreadWoken(); }); + auto waitResult = clockSource->waitForConditionUntil(_cv, queueLock, until, baton); + if (waitResult == stdx::cv_status::timeout) + return false; + // We check if the operation has been interrupted here. + if (waitMode == WaitMode::kInterruptible) { + opCtx->checkForInterrupt(); + } } } while (!_holder->_tryAcquireTicket()); return true; |