diff options
author | Jordi Olivares Provencio <jordi.olivares-provencio@mongodb.com> | 2022-08-26 14:49:55 +0000 |
---|---|---|
committer | Evergreen Agent <no-reply@evergreen.mongodb.com> | 2022-10-05 14:00:50 +0000 |
commit | acab83f1479eaea3b75fe03f2ed9f92f980e6f3a (patch) | |
tree | 7d9ce80c72844cd9481af64247e818458f4dd7a8 | |
parent | 59053967edeea2ace11b0eb9fbe4542dc56a0cab (diff) | |
download | mongo-acab83f1479eaea3b75fe03f2ed9f92f980e6f3a.tar.gz |
SERVER-68854 Refactor SchedulingTicketHolder queueing logic
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 65 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.h | 7 | ||||
-rw-r--r-- | src/mongo/util/concurrency/ticketholder_test.cpp | 18 |
3 files changed, 34 insertions, 56 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index af9198dd243..2d5f9e9d54a 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -637,9 +637,6 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio WaitMode waitMode) { invariant(admCtx); - auto interruptible = - waitMode == WaitMode::kInterruptible ? opCtx : Interruptible::notInterruptible(); - auto& queue = _getQueueToUse(opCtx, admCtx); bool assigned; @@ -647,7 +644,7 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio stdx::unique_lock lk(_queueMutex); _enqueuedElements.addAndFetch(1); ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); }); - assigned = queue.enqueue(interruptible, lk, until); + assigned = queue.enqueue(opCtx, lk, until, waitMode); } if (assigned) { return Ticket{this, admCtx}; @@ -661,7 +658,7 @@ bool SchedulingTicketHolder::Queue::attemptToDequeue() { while (threadsToBeWoken < _queuedThreads) { auto canDequeue = _threadsToBeWoken.compareAndSwap(&threadsToBeWoken, threadsToBeWoken + 1); if (canDequeue) { - _queue.notify_one(); + _cv.notify_one(); return true; } } @@ -678,42 +675,40 @@ void SchedulingTicketHolder::Queue::_signalThreadWoken() { } } -bool SchedulingTicketHolder::Queue::enqueue(Interruptible* interruptible, +bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx, EnqueuerLockGuard& queueLock, - const Date_t& until) { + const Date_t& until, + WaitMode waitMode) { _queuedThreads++; // Before exiting we remove ourselves from the count of queued threads, we are still holding the // lock here so this is safe. ON_BLOCK_EXIT([&] { _queuedThreads--; }); - bool isFirstCheck = true; + + // TODO SERVER-69179: Replace the custom version of waiting on a condition variable with what + // comes out of SERVER-69178. + auto clockSource = opCtx->getServiceContext()->getPreciseClockSource(); + auto baton = waitMode == WaitMode::kInterruptible ? opCtx->getBaton().get() : nullptr; + + // We need to determine the actual deadline to use. + auto deadline = waitMode == WaitMode::kInterruptible ? std::min(until, opCtx->getDeadline()) + : Date_t::max(); + do { - try { - interruptible->waitForConditionOrInterruptUntil(_queue, queueLock, until, [&] { - // As this block is executed when getting woken we must modify the woken count. - // Otherwise we are prone to deadlocking if we get woken and there are no tickets - // available, permanently signalling that a thread has been woken. - // - // We don't signal that a thread has been woken during the first predicate check as - // the underlying implementation does the following: - // - // while(!pred()) { - // cv.wait(lk); - // } - // - // Thus the predicate will always be called once before waiting. - if (isFirstCheck) { - isFirstCheck = false; - } else { - _signalThreadWoken(); - } - return _holder->_ticketsAvailable.load() > 0; - }); - } catch (...) { - _signalThreadWoken(); - throw; - } - if (Date_t::now() >= until) { - return false; + // We normally would use the opCtx->waitForConditionOrInterruptUntil method for doing this + // check. The problem is that we must call a method that signals that the thread has been + // woken after the condition variable wait, not before which is where the predicate would + // go. + while (_holder->_ticketsAvailable.load() <= 0) { + // This method must be called after getting woken in all cases, so we use a ScopeGuard + // to handle exceptions as well as early returns. + ON_BLOCK_EXIT([&] { _signalThreadWoken(); }); + auto waitResult = clockSource->waitForConditionUntil(_cv, queueLock, deadline, baton); + // We check if the operation has been interrupted (timeout, killed, etc.) here. + if (waitMode == WaitMode::kInterruptible) { + opCtx->checkForInterrupt(); + } + if (waitResult == stdx::cv_status::timeout) + return false; } } while (!_holder->_tryAcquireTicket()); return true; diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h index 66769f1c015..eedaa1bb8bc 100644 --- a/src/mongo/util/concurrency/ticketholder.h +++ b/src/mongo/util/concurrency/ticketholder.h @@ -340,9 +340,10 @@ protected: bool attemptToDequeue(); - bool enqueue(Interruptible* interruptible, + bool enqueue(OperationContext* interruptible, EnqueuerLockGuard& queueLock, - const Date_t& until); + const Date_t& until, + WaitMode waitMode); int queuedElems() const { return _queuedThreads; @@ -353,7 +354,7 @@ protected: int _queuedThreads{0}; AtomicWord<int> _threadsToBeWoken{0}; - stdx::condition_variable _queue; + stdx::condition_variable _cv; SchedulingTicketHolder* _holder; }; diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp index 5baa7508e40..e2c78cfb454 100644 --- a/src/mongo/util/concurrency/ticketholder_test.cpp +++ b/src/mongo/util/concurrency/ticketholder_test.cpp @@ -333,12 +333,6 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) { while (holder.queued() < 2) { } - // Waiting for a condition variable in MongoDB is not an atomic operation for the underlying - // mutex (release + wait the CV is not an atomic operation). To overcome this there is - // currently no other solution but to wait a bit until the threads are really waiting in the - // condition variable. - stdx::this_thread::sleep_for(stdx::chrono::milliseconds(50)); - ASSERT_EQ(stats["queueLength"], 2); ticket.reset(); @@ -409,12 +403,6 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { while (holder.queued() < 2) { } - // Waiting for a condition variable in MongoDB is not an atomic operation for the underlying - // mutex (release + wait the CV is not an atomic operation). To overcome this there is - // currently no other solution but to wait a bit until the threads are really waiting in the - // condition variable. - stdx::this_thread::sleep_for(stdx::chrono::milliseconds(50)); - // Release the ticket. ticket.reset(); @@ -442,12 +430,6 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) { while (holder.queued() < 2) { } - // Waiting for a condition variable in MongoDB is not an atomic operation for the underlying - // mutex (release + wait the CV is not an atomic operation). To overcome this there is - // currently no other solution but to wait a bit until the threads are really waiting in the - // condition variable. - stdx::this_thread::sleep_for(stdx::chrono::milliseconds(50)); - // Release the ticket. ticketNormal1Priority.reset(); |