diff options
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.cpp')
-rw-r--r-- | src/mongo/util/concurrency/ticketholder.cpp | 90 |
1 files changed, 48 insertions, 42 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp index e46ff7c0c0f..4043eea04b7 100644 --- a/src/mongo/util/concurrency/ticketholder.cpp +++ b/src/mongo/util/concurrency/ticketholder.cpp @@ -262,61 +262,63 @@ bool SemaphoreTicketHolder::_tryAcquire() { } #endif -FifoTicketHolder::FifoTicketHolder(int num) - : _capacity(num), _numAvailable(num), _elementsInQueue(0) {} +FifoTicketHolder::FifoTicketHolder(int num) : _capacity(num) { + _ticketsAvailable.store(num); + _enqueuedElements.store(0); +} FifoTicketHolder::~FifoTicketHolder() {} int FifoTicketHolder::available() const { - return _numAvailable.load(); + return _ticketsAvailable.load(); } int FifoTicketHolder::used() const { - return _capacity.load() - _numAvailable.load(); + return outof() - available(); } int FifoTicketHolder::outof() const { return _capacity.load(); } -void FifoTicketHolder::_release(WithLock) { - auto newAvailable = _numAvailable.addAndFetch(1); - // This is not an optimization but defensively programming against possible edge cases that we - // haven't thought of. For example, if we have the situation where we have N tickets available - // but something is in the queue, we must push it to the end of the queue. In this case having a - // batch release process would be beneficial as it would remove as many elements as it can from - // the queue, leading us back to the fast ticket path in the normal case. - while (newAvailable > 0) { - auto waitingElement = _queue.tryPop(); - if (waitingElement) { - auto& elem = *waitingElement; - _elementsInQueue.subtractAndFetch(1); - stdx::lock_guard lk(elem->modificationMutex); - if (elem->state != WaitingState::Waiting) { - // If the operation has already been finalized we skip the element and don't assign - // a ticket. - continue; +void FifoTicketHolder::release() { + stdx::lock_guard lk(_queueMutex); + // This loop will most of the time be executed only once. In case some operations in the + // queue have been cancelled or already took a ticket the releasing operation should search for + // a waiting operation to avoid leaving an operation waiting indefinitely. + while (true) { + if (!_queue.empty()) { + auto& elem = _queue.front(); + _enqueuedElements.subtractAndFetch(1); + { + stdx::lock_guard elemLk(elem->modificationMutex); + if (elem->state != WaitingState::Waiting) { + // If the operation has already been finalized we skip the element and don't + // assign a ticket. + _queue.pop(); + continue; + } + elem->state = WaitingState::Assigned; } - elem->state = WaitingState::Assigned; elem->signaler.notify_all(); - newAvailable = _numAvailable.subtractAndFetch(1); + _queue.pop(); } else { - return; + _ticketsAvailable.addAndFetch(1); } + return; } -} // namespace mongo - -void FifoTicketHolder::release() { - stdx::lock_guard lk(_queueMutex); - _release(lk); } bool FifoTicketHolder::tryAcquire() { - stdx::lock_guard lk(_queueMutex); - if (_numAvailable.load() > 0 && _elementsInQueue.load() == 0) { - _numAvailable.subtractAndFetch(1); - return true; - } else { + auto queued = _enqueuedElements.load(); + if (queued > 0) + return false; + + auto remaining = _ticketsAvailable.subtractAndFetch(1); + if (remaining < 0) { + _ticketsAvailable.addAndFetch(1); return false; } + + return true; } void FifoTicketHolder::waitForTicket(OperationContext* opCtx) { @@ -333,14 +335,19 @@ bool FifoTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) waitingElement->state = WaitingState::Waiting; { stdx::lock_guard lk(_queueMutex); - if (opCtx) { - _queue.push(std::shared_ptr(waitingElement), opCtx); - } else { - _queue.push(std::shared_ptr(waitingElement)); + _enqueuedElements.addAndFetch(1); + // Check for available tickets under the queue lock, in case a ticket has just been + // released. + auto remaining = _ticketsAvailable.subtractAndFetch(1); + if (remaining >= 0) { + _enqueuedElements.subtractAndFetch(1); + return true; } - _elementsInQueue.addAndFetch(1); + _ticketsAvailable.addAndFetch(1); + // We copy-construct the shared_ptr here as the waiting element needs to be alive in both + // release() and waitForTicket(). Otherwise the code could lead to a segmentation fault + _queue.emplace(waitingElement); } - ScopeGuard cancelWait([&] { bool hasAssignedTicket = false; { @@ -351,8 +358,7 @@ bool FifoTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until) if (hasAssignedTicket) { // To cover the edge case of getting a ticket assigned before cancelling the ticket // request. As we have been granted a ticket we must release it. - stdx::lock_guard queueLock(_queueMutex); - _release(queueLock); + release(); } }); |