summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/ticketholder.cpp
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2022-08-25 09:53:17 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-25 10:31:44 +0000
commitb887165489cd44f581946b341a8941d2d27ddfec (patch)
treefd14e95e94346fb5e7be14401b5797d45304348f /src/mongo/util/concurrency/ticketholder.cpp
parent1b12c649156f62b2816aae2d45fac4a35f90eb60 (diff)
downloadmongo-b887165489cd44f581946b341a8941d2d27ddfec.tar.gz
SERVER-68854 Refactor SchedulingTicketHolder queueing logic
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.cpp')
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp59
1 files changed, 24 insertions, 35 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index af9198dd243..d9bab22a47b 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -637,9 +637,6 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio
WaitMode waitMode) {
invariant(admCtx);
- auto interruptible =
- waitMode == WaitMode::kInterruptible ? opCtx : Interruptible::notInterruptible();
-
auto& queue = _getQueueToUse(opCtx, admCtx);
bool assigned;
@@ -647,7 +644,7 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio
stdx::unique_lock lk(_queueMutex);
_enqueuedElements.addAndFetch(1);
ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); });
- assigned = queue.enqueue(interruptible, lk, until);
+ assigned = queue.enqueue(opCtx, lk, until, waitMode);
}
if (assigned) {
return Ticket{this, admCtx};
@@ -661,7 +658,7 @@ bool SchedulingTicketHolder::Queue::attemptToDequeue() {
while (threadsToBeWoken < _queuedThreads) {
auto canDequeue = _threadsToBeWoken.compareAndSwap(&threadsToBeWoken, threadsToBeWoken + 1);
if (canDequeue) {
- _queue.notify_one();
+ _cv.notify_one();
return true;
}
}
@@ -678,42 +675,34 @@ void SchedulingTicketHolder::Queue::_signalThreadWoken() {
}
}
-bool SchedulingTicketHolder::Queue::enqueue(Interruptible* interruptible,
+bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx,
EnqueuerLockGuard& queueLock,
- const Date_t& until) {
+ const Date_t& until,
+ WaitMode waitMode) {
_queuedThreads++;
// Before exiting we remove ourselves from the count of queued threads, we are still holding the
// lock here so this is safe.
ON_BLOCK_EXIT([&] { _queuedThreads--; });
- bool isFirstCheck = true;
+
+ auto clockSource = opCtx->getServiceContext()->getPreciseClockSource();
+ auto baton = waitMode == WaitMode::kInterruptible ? opCtx->getBaton().get() : nullptr;
+
do {
- try {
- interruptible->waitForConditionOrInterruptUntil(_queue, queueLock, until, [&] {
- // As this block is executed when getting woken we must modify the woken count.
- // Otherwise we are prone to deadlocking if we get woken and there are no tickets
- // available, permanently signalling that a thread has been woken.
- //
- // We don't signal that a thread has been woken during the first predicate check as
- // the underlying implementation does the following:
- //
- // while(!pred()) {
- // cv.wait(lk);
- // }
- //
- // Thus the predicate will always be called once before waiting.
- if (isFirstCheck) {
- isFirstCheck = false;
- } else {
- _signalThreadWoken();
- }
- return _holder->_ticketsAvailable.load() > 0;
- });
- } catch (...) {
- _signalThreadWoken();
- throw;
- }
- if (Date_t::now() >= until) {
- return false;
+ // We normally would use the opCtx->waitForConditionOrInterruptUntil method for doing this
+ // check. The problem is that we must call a method that signals that the thread has been
+ // woken after the condition variable wait, not before which is where the predicate would
+ // go.
+ while (_holder->_ticketsAvailable.load() <= 0) {
+ // This method must be called after getting woken in all cases, so we use a ScopeGuard
+ // to handle exceptions as well as early returns.
+ ON_BLOCK_EXIT([&] { _signalThreadWoken(); });
+ auto waitResult = clockSource->waitForConditionUntil(_cv, queueLock, until, baton);
+ if (waitResult == stdx::cv_status::timeout)
+ return false;
+ // We check if the operation has been interrupted here.
+ if (waitMode == WaitMode::kInterruptible) {
+ opCtx->checkForInterrupt();
+ }
}
} while (!_holder->_tryAcquireTicket());
return true;