summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorJordi Olivares Provencio <jordi.olivares-provencio@mongodb.com>2022-08-26 14:49:55 +0000
committerEvergreen Agent <no-reply@evergreen.mongodb.com>2022-08-26 16:49:19 +0000
commit1bb3510dbff6981839aca538ee0cf1d230878a9c (patch)
tree79ec606949cc7f13e79cacf58178490f244744bb
parent975e47269a0cf27c4781fc69517c74438334d545 (diff)
downloadmongo-1bb3510dbff6981839aca538ee0cf1d230878a9c.tar.gz
SERVER-68854 Refactor SchedulingTicketHolder queueing logic
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp65
-rw-r--r--src/mongo/util/concurrency/ticketholder.h7
-rw-r--r--src/mongo/util/concurrency/ticketholder_test.cpp18
3 files changed, 34 insertions, 56 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index af9198dd243..2d5f9e9d54a 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -637,9 +637,6 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio
WaitMode waitMode) {
invariant(admCtx);
- auto interruptible =
- waitMode == WaitMode::kInterruptible ? opCtx : Interruptible::notInterruptible();
-
auto& queue = _getQueueToUse(opCtx, admCtx);
bool assigned;
@@ -647,7 +644,7 @@ boost::optional<Ticket> SchedulingTicketHolder::_waitForTicketUntilImpl(Operatio
stdx::unique_lock lk(_queueMutex);
_enqueuedElements.addAndFetch(1);
ON_BLOCK_EXIT([&] { _enqueuedElements.subtractAndFetch(1); });
- assigned = queue.enqueue(interruptible, lk, until);
+ assigned = queue.enqueue(opCtx, lk, until, waitMode);
}
if (assigned) {
return Ticket{this, admCtx};
@@ -661,7 +658,7 @@ bool SchedulingTicketHolder::Queue::attemptToDequeue() {
while (threadsToBeWoken < _queuedThreads) {
auto canDequeue = _threadsToBeWoken.compareAndSwap(&threadsToBeWoken, threadsToBeWoken + 1);
if (canDequeue) {
- _queue.notify_one();
+ _cv.notify_one();
return true;
}
}
@@ -678,42 +675,40 @@ void SchedulingTicketHolder::Queue::_signalThreadWoken() {
}
}
-bool SchedulingTicketHolder::Queue::enqueue(Interruptible* interruptible,
+bool SchedulingTicketHolder::Queue::enqueue(OperationContext* opCtx,
EnqueuerLockGuard& queueLock,
- const Date_t& until) {
+ const Date_t& until,
+ WaitMode waitMode) {
_queuedThreads++;
// Before exiting we remove ourselves from the count of queued threads, we are still holding the
// lock here so this is safe.
ON_BLOCK_EXIT([&] { _queuedThreads--; });
- bool isFirstCheck = true;
+
+ // TODO SERVER-69179: Replace the custom version of waiting on a condition variable with what
+ // comes out of SERVER-69178.
+ auto clockSource = opCtx->getServiceContext()->getPreciseClockSource();
+ auto baton = waitMode == WaitMode::kInterruptible ? opCtx->getBaton().get() : nullptr;
+
+ // We need to determine the actual deadline to use.
+ auto deadline = waitMode == WaitMode::kInterruptible ? std::min(until, opCtx->getDeadline())
+ : Date_t::max();
+
do {
- try {
- interruptible->waitForConditionOrInterruptUntil(_queue, queueLock, until, [&] {
- // As this block is executed when getting woken we must modify the woken count.
- // Otherwise we are prone to deadlocking if we get woken and there are no tickets
- // available, permanently signalling that a thread has been woken.
- //
- // We don't signal that a thread has been woken during the first predicate check as
- // the underlying implementation does the following:
- //
- // while(!pred()) {
- // cv.wait(lk);
- // }
- //
- // Thus the predicate will always be called once before waiting.
- if (isFirstCheck) {
- isFirstCheck = false;
- } else {
- _signalThreadWoken();
- }
- return _holder->_ticketsAvailable.load() > 0;
- });
- } catch (...) {
- _signalThreadWoken();
- throw;
- }
- if (Date_t::now() >= until) {
- return false;
+ // We normally would use the opCtx->waitForConditionOrInterruptUntil method for doing this
+ // check. The problem is that we must call a method that signals that the thread has been
+ // woken after the condition variable wait, not before which is where the predicate would
+ // go.
+ while (_holder->_ticketsAvailable.load() <= 0) {
+ // This method must be called after getting woken in all cases, so we use a ScopeGuard
+ // to handle exceptions as well as early returns.
+ ON_BLOCK_EXIT([&] { _signalThreadWoken(); });
+ auto waitResult = clockSource->waitForConditionUntil(_cv, queueLock, deadline, baton);
+ // We check if the operation has been interrupted (timeout, killed, etc.) here.
+ if (waitMode == WaitMode::kInterruptible) {
+ opCtx->checkForInterrupt();
+ }
+ if (waitResult == stdx::cv_status::timeout)
+ return false;
}
} while (!_holder->_tryAcquireTicket());
return true;
diff --git a/src/mongo/util/concurrency/ticketholder.h b/src/mongo/util/concurrency/ticketholder.h
index 66769f1c015..eedaa1bb8bc 100644
--- a/src/mongo/util/concurrency/ticketholder.h
+++ b/src/mongo/util/concurrency/ticketholder.h
@@ -340,9 +340,10 @@ protected:
bool attemptToDequeue();
- bool enqueue(Interruptible* interruptible,
+ bool enqueue(OperationContext* interruptible,
EnqueuerLockGuard& queueLock,
- const Date_t& until);
+ const Date_t& until,
+ WaitMode waitMode);
int queuedElems() const {
return _queuedThreads;
@@ -353,7 +354,7 @@ protected:
int _queuedThreads{0};
AtomicWord<int> _threadsToBeWoken{0};
- stdx::condition_variable _queue;
+ stdx::condition_variable _cv;
SchedulingTicketHolder* _holder;
};
diff --git a/src/mongo/util/concurrency/ticketholder_test.cpp b/src/mongo/util/concurrency/ticketholder_test.cpp
index 5baa7508e40..e2c78cfb454 100644
--- a/src/mongo/util/concurrency/ticketholder_test.cpp
+++ b/src/mongo/util/concurrency/ticketholder_test.cpp
@@ -333,12 +333,6 @@ TEST_F(TicketHolderTest, PriorityTwoQueuedOperations) {
while (holder.queued() < 2) {
}
- // Waiting for a condition variable in MongoDB is not an atomic operation for the underlying
- // mutex (release + wait the CV is not an atomic operation). To overcome this there is
- // currently no other solution but to wait a bit until the threads are really waiting in the
- // condition variable.
- stdx::this_thread::sleep_for(stdx::chrono::milliseconds(50));
-
ASSERT_EQ(stats["queueLength"], 2);
ticket.reset();
@@ -409,12 +403,6 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
while (holder.queued() < 2) {
}
- // Waiting for a condition variable in MongoDB is not an atomic operation for the underlying
- // mutex (release + wait the CV is not an atomic operation). To overcome this there is
- // currently no other solution but to wait a bit until the threads are really waiting in the
- // condition variable.
- stdx::this_thread::sleep_for(stdx::chrono::milliseconds(50));
-
// Release the ticket.
ticket.reset();
@@ -442,12 +430,6 @@ TEST_F(TicketHolderTest, PriorityTwoNormalOneLowQueuedOperations) {
while (holder.queued() < 2) {
}
- // Waiting for a condition variable in MongoDB is not an atomic operation for the underlying
- // mutex (release + wait the CV is not an atomic operation). To overcome this there is
- // currently no other solution but to wait a bit until the threads are really waiting in the
- // condition variable.
- stdx::this_thread::sleep_for(stdx::chrono::milliseconds(50));
-
// Release the ticket.
ticketNormal1Priority.reset();