summaryrefslogtreecommitdiff
path: root/src/mongo/util/concurrency/ticketholder.cpp
diff options
context:
space:
mode:
Diffstat (limited to 'src/mongo/util/concurrency/ticketholder.cpp')
-rw-r--r--src/mongo/util/concurrency/ticketholder.cpp90
1 files changed, 48 insertions, 42 deletions
diff --git a/src/mongo/util/concurrency/ticketholder.cpp b/src/mongo/util/concurrency/ticketholder.cpp
index e46ff7c0c0f..4043eea04b7 100644
--- a/src/mongo/util/concurrency/ticketholder.cpp
+++ b/src/mongo/util/concurrency/ticketholder.cpp
@@ -262,61 +262,63 @@ bool SemaphoreTicketHolder::_tryAcquire() {
}
#endif
-FifoTicketHolder::FifoTicketHolder(int num)
- : _capacity(num), _numAvailable(num), _elementsInQueue(0) {}
+FifoTicketHolder::FifoTicketHolder(int num) : _capacity(num) {
+ _ticketsAvailable.store(num);
+ _enqueuedElements.store(0);
+}
FifoTicketHolder::~FifoTicketHolder() {}
int FifoTicketHolder::available() const {
- return _numAvailable.load();
+ return _ticketsAvailable.load();
}
int FifoTicketHolder::used() const {
- return _capacity.load() - _numAvailable.load();
+ return outof() - available();
}
int FifoTicketHolder::outof() const {
return _capacity.load();
}
-void FifoTicketHolder::_release(WithLock) {
- auto newAvailable = _numAvailable.addAndFetch(1);
- // This is not an optimization but defensively programming against possible edge cases that we
- // haven't thought of. For example, if we have the situation where we have N tickets available
- // but something is in the queue, we must push it to the end of the queue. In this case having a
- // batch release process would be beneficial as it would remove as many elements as it can from
- // the queue, leading us back to the fast ticket path in the normal case.
- while (newAvailable > 0) {
- auto waitingElement = _queue.tryPop();
- if (waitingElement) {
- auto& elem = *waitingElement;
- _elementsInQueue.subtractAndFetch(1);
- stdx::lock_guard lk(elem->modificationMutex);
- if (elem->state != WaitingState::Waiting) {
- // If the operation has already been finalized we skip the element and don't assign
- // a ticket.
- continue;
+void FifoTicketHolder::release() {
+ stdx::lock_guard lk(_queueMutex);
+ // This loop will most of the time be executed only once. In case some operations in the
+ // queue have been cancelled or already took a ticket the releasing operation should search for
+ // a waiting operation to avoid leaving an operation waiting indefinitely.
+ while (true) {
+ if (!_queue.empty()) {
+ auto& elem = _queue.front();
+ _enqueuedElements.subtractAndFetch(1);
+ {
+ stdx::lock_guard elemLk(elem->modificationMutex);
+ if (elem->state != WaitingState::Waiting) {
+ // If the operation has already been finalized we skip the element and don't
+ // assign a ticket.
+ _queue.pop();
+ continue;
+ }
+ elem->state = WaitingState::Assigned;
}
- elem->state = WaitingState::Assigned;
elem->signaler.notify_all();
- newAvailable = _numAvailable.subtractAndFetch(1);
+ _queue.pop();
} else {
- return;
+ _ticketsAvailable.addAndFetch(1);
}
+ return;
}
-} // namespace mongo
-
-void FifoTicketHolder::release() {
- stdx::lock_guard lk(_queueMutex);
- _release(lk);
}
bool FifoTicketHolder::tryAcquire() {
- stdx::lock_guard lk(_queueMutex);
- if (_numAvailable.load() > 0 && _elementsInQueue.load() == 0) {
- _numAvailable.subtractAndFetch(1);
- return true;
- } else {
+ auto queued = _enqueuedElements.load();
+ if (queued > 0)
+ return false;
+
+ auto remaining = _ticketsAvailable.subtractAndFetch(1);
+ if (remaining < 0) {
+ _ticketsAvailable.addAndFetch(1);
return false;
}
+
+ return true;
}
void FifoTicketHolder::waitForTicket(OperationContext* opCtx) {
@@ -333,14 +335,19 @@ bool FifoTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until)
waitingElement->state = WaitingState::Waiting;
{
stdx::lock_guard lk(_queueMutex);
- if (opCtx) {
- _queue.push(std::shared_ptr(waitingElement), opCtx);
- } else {
- _queue.push(std::shared_ptr(waitingElement));
+ _enqueuedElements.addAndFetch(1);
+ // Check for available tickets under the queue lock, in case a ticket has just been
+ // released.
+ auto remaining = _ticketsAvailable.subtractAndFetch(1);
+ if (remaining >= 0) {
+ _enqueuedElements.subtractAndFetch(1);
+ return true;
}
- _elementsInQueue.addAndFetch(1);
+ _ticketsAvailable.addAndFetch(1);
+ // We copy-construct the shared_ptr here as the waiting element needs to be alive in both
+ // release() and waitForTicket(). Otherwise the code could lead to a segmentation fault
+ _queue.emplace(waitingElement);
}
-
ScopeGuard cancelWait([&] {
bool hasAssignedTicket = false;
{
@@ -351,8 +358,7 @@ bool FifoTicketHolder::waitForTicketUntil(OperationContext* opCtx, Date_t until)
if (hasAssignedTicket) {
// To cover the edge case of getting a ticket assigned before cancelling the ticket
// request. As we have been granted a ticket we must release it.
- stdx::lock_guard queueLock(_queueMutex);
- _release(queueLock);
+ release();
}
});