diff options
Diffstat (limited to 'rts/Capability.c')
-rw-r--r-- | rts/Capability.c | 301 |
1 files changed, 186 insertions, 115 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index 21f63f39d9..b0b7f307b5 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -43,7 +43,7 @@ nat enabled_capabilities = 0; // The array of Capabilities. It's important that when we need // to allocate more Capabilities we don't have to move the existing // Capabilities, because there may be pointers to them in use -// (e.g. threads in waitForReturnCapability(), see #8209), so this is +// (e.g. threads in waitForCapability(), see #8209), so this is // an array of Capability* rather than an array of Capability. Capability **capabilities = NULL; @@ -450,11 +450,10 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) #endif /* ---------------------------------------------------------------------------- - * Function: releaseCapability(Capability*) + * releaseCapability * - * Purpose: Letting go of a capability. Causes a - * 'returning worker' thread or a 'waiting worker' - * to wake up, in that order. + * The current Task (cap->task) releases the Capability. The Capability is + * marked free, and if there is any work to do, an appropriate Task is woken up. * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) @@ -474,13 +473,13 @@ releaseCapability_ (Capability* cap, // the go-ahead to return the result of an external call.. if (cap->returning_tasks_hd != NULL) { giveCapabilityToTask(cap,cap->returning_tasks_hd); - // The Task pops itself from the queue (see waitForReturnCapability()) + // The Task pops itself from the queue (see waitForCapability()) return; } // If there is a pending sync, then we should just leave the // Capability free. The thread trying to sync will be about to - // call waitForReturnCapability(). + // call waitForCapability(). if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) { last_free_capability = cap; // needed? debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no); @@ -549,62 +548,156 @@ releaseAndWakeupCapability (Capability* cap USED_IF_THREADS) } static void -releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) +enqueueWorker (Capability* cap USED_IF_THREADS) { Task *task; - ACQUIRE_LOCK(&cap->lock); - task = cap->running_task; // If the Task is stopped, we shouldn't be yielding, we should // be just exiting. ASSERT(!task->stopped); + ASSERT(task->worker); - // If the current task is a worker, save it on the spare_workers - // list of this Capability. A worker can mark itself as stopped, - // in which case it is not replaced on the spare_worker queue. - // This happens when the system is shutting down (see - // Schedule.c:workerStart()). - if (!isBoundTask(task)) + if (cap->n_spare_workers < MAX_SPARE_WORKERS) + { + task->next = cap->spare_workers; + cap->spare_workers = task; + cap->n_spare_workers++; + } + else { - if (cap->n_spare_workers < MAX_SPARE_WORKERS) - { - task->next = cap->spare_workers; - cap->spare_workers = task; - cap->n_spare_workers++; + debugTrace(DEBUG_sched, "%d spare workers already, exiting", + cap->n_spare_workers); + releaseCapability_(cap,rtsFalse); + // hold the lock until after workerTaskStop; c.f. scheduleWorker() + workerTaskStop(task); + RELEASE_LOCK(&cap->lock); + shutdownThread(); + } +} + +#endif + +/* ---------------------------------------------------------------------------- + * waitForWorkerCapability(task) + * + * waits to be given a Capability, and then returns the Capability. The task + * must be either a worker (and on a cap->spare_workers queue), or a bound Task. + * ------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) + +static Capability * waitForWorkerCapability (Task *task) +{ + Capability *cap; + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + debugTrace(DEBUG_sched, "woken up on capability %d", cap->no); + + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task != NULL) { + debugTrace(DEBUG_sched, + "capability %d is owned by another task", cap->no); + RELEASE_LOCK(&cap->lock); + continue; } - else - { - debugTrace(DEBUG_sched, "%d spare workers already, exiting", - cap->n_spare_workers); - releaseCapability_(cap,rtsFalse); - // hold the lock until after workerTaskStop; c.f. scheduleWorker() - workerTaskStop(task); + + if (task->cap != cap) { + // see Note [migrated bound threads] + debugTrace(DEBUG_sched, + "task has been migrated to cap %d", task->cap->no); RELEASE_LOCK(&cap->lock); - shutdownThread(); + continue; + } + + if (task->incall->tso == NULL) { + ASSERT(cap->spare_workers != NULL); + // if we're not at the front of the queue, release it + // again. This is unlikely to happen. + if (cap->spare_workers != task) { + giveCapabilityToTask(cap,cap->spare_workers); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->spare_workers = task->next; + task->next = NULL; + cap->n_spare_workers--; } + + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + break; } - // Bound tasks just float around attached to their TSOs. - releaseCapability_(cap,rtsFalse); + return cap; +} - RELEASE_LOCK(&cap->lock); +#endif /* THREADED_RTS */ + +/* ---------------------------------------------------------------------------- + * waitForReturnCapability (Task *task) + * + * The Task should be on the cap->returning_tasks queue of a Capability. This + * function waits for the Task to be woken up, and returns the Capability that + * it was woken up on. + * + * ------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) + +static Capability * waitForReturnCapability (Task *task) +{ + Capability *cap; + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + // now check whether we should wake up... + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task == NULL) { + if (cap->returning_tasks_hd != task) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->running_task = task; + popReturningTask(cap); + RELEASE_LOCK(&cap->lock); + break; + } + RELEASE_LOCK(&cap->lock); + } + + return cap; } -#endif + +#endif /* THREADED_RTS */ /* ---------------------------------------------------------------------------- - * waitForReturnCapability (Capability **pCap, Task *task) + * waitForCapability (Capability **pCap, Task *task) * * Purpose: when an OS thread returns from an external call, - * it calls waitForReturnCapability() (via Schedule.resumeThread()) + * it calls waitForCapability() (via Schedule.resumeThread()) * to wait for permission to enter the RTS & communicate the * result of the external call back to the Haskell thread that * made it. * * ------------------------------------------------------------------------- */ -void -waitForReturnCapability (Capability **pCap, Task *task) + +void waitForCapability (Capability **pCap, Task *task) { #if !defined(THREADED_RTS) @@ -641,10 +734,9 @@ waitForReturnCapability (Capability **pCap, Task *task) ASSERT(task->cap == cap); } - ACQUIRE_LOCK(&cap->lock); - debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no); + ACQUIRE_LOCK(&cap->lock); if (!cap->running_task) { // It's free; just grab it cap->running_task = task; @@ -652,31 +744,7 @@ waitForReturnCapability (Capability **pCap, Task *task) } else { newReturningTask(cap,task); RELEASE_LOCK(&cap->lock); - - for (;;) { - ACQUIRE_LOCK(&task->lock); - // task->lock held, cap->lock not held - if (!task->wakeup) waitCondition(&task->cond, &task->lock); - cap = task->cap; - task->wakeup = rtsFalse; - RELEASE_LOCK(&task->lock); - - // now check whether we should wake up... - ACQUIRE_LOCK(&cap->lock); - if (cap->running_task == NULL) { - if (cap->returning_tasks_hd != task) { - giveCapabilityToTask(cap,cap->returning_tasks_hd); - RELEASE_LOCK(&cap->lock); - continue; - } - cap->running_task = task; - popReturningTask(cap); - RELEASE_LOCK(&cap->lock); - break; - } - RELEASE_LOCK(&cap->lock); - } - + cap = waitForReturnCapability(task); } #ifdef PROFILING @@ -691,11 +759,30 @@ waitForReturnCapability (Capability **pCap, Task *task) #endif } -#if defined(THREADED_RTS) /* ---------------------------------------------------------------------------- * yieldCapability + * + * Give up the Capability, and return when we have it again. This is called + * when either we know that the Capability should be given to another Task, or + * there is nothing to do right now. One of the following is true: + * + * - The current Task is a worker, and there's a bound thread at the head of + * the run queue (or vice versa) + * + * - The run queue is empty. We'll be woken up again when there's work to + * do. + * + * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR). + * We should become a GC worker for a while. + * + * - Another Task is trying to acquire all the Capabilities (pending_sync != + * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or + * setNumCapabilities. We should give up the Capability temporarily. + * * ------------------------------------------------------------------------- */ +#if defined (THREADED_RTS) + /* See Note [GC livelock] in Schedule.c for why we have gcAllowed and return the rtsBool */ rtsBool /* Did we GC? */ @@ -714,63 +801,39 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed) } } - debugTrace(DEBUG_sched, "giving up capability %d", cap->no); + debugTrace(DEBUG_sched, "giving up capability %d", cap->no); - // We must now release the capability and wait to be woken up - // again. - task->wakeup = rtsFalse; - releaseCapabilityAndQueueWorker(cap); - - for (;;) { - ACQUIRE_LOCK(&task->lock); - // task->lock held, cap->lock not held - if (!task->wakeup) waitCondition(&task->cond, &task->lock); - cap = task->cap; - task->wakeup = rtsFalse; - RELEASE_LOCK(&task->lock); - - debugTrace(DEBUG_sched, "woken up on capability %d", cap->no); - - ACQUIRE_LOCK(&cap->lock); - if (cap->running_task != NULL) { - debugTrace(DEBUG_sched, - "capability %d is owned by another task", cap->no); - RELEASE_LOCK(&cap->lock); - continue; - } + // We must now release the capability and wait to be woken up again. + task->wakeup = rtsFalse; - if (task->cap != cap) { - // see Note [migrated bound threads] - debugTrace(DEBUG_sched, - "task has been migrated to cap %d", task->cap->no); - RELEASE_LOCK(&cap->lock); - continue; - } + ACQUIRE_LOCK(&cap->lock); - if (task->incall->tso == NULL) { - ASSERT(cap->spare_workers != NULL); - // if we're not at the front of the queue, release it - // again. This is unlikely to happen. - if (cap->spare_workers != task) { - giveCapabilityToTask(cap,cap->spare_workers); - RELEASE_LOCK(&cap->lock); - continue; - } - cap->spare_workers = task->next; - task->next = NULL; - cap->n_spare_workers--; - } + // If this is a worker thread, put it on the spare_workers queue + if (isWorker(task)) { + enqueueWorker(cap); + } - cap->running_task = task; - RELEASE_LOCK(&cap->lock); - break; - } + releaseCapability_(cap, rtsFalse); - debugTrace(DEBUG_sched, "resuming capability %d", cap->no); - ASSERT(cap->running_task == task); + if (isWorker(task) || isBoundTask(task)) { + RELEASE_LOCK(&cap->lock); + cap = waitForWorkerCapability(task); + } else { + // Not a worker Task, or a bound Task. The only way we can be woken up + // again is to put ourselves on the returning_tasks queue, so that's + // what we do. We still hold cap->lock at this point + // The Task waiting for this Capability does not have it + // yet, so we can be sure to be woken up later. (see #10545) + newReturningTask(cap,task); + RELEASE_LOCK(&cap->lock); + cap = waitForReturnCapability(task); + } + + debugTrace(DEBUG_sched, "resuming capability %d", cap->no); + ASSERT(cap->running_task == task); #ifdef PROFILING - cap->r.rCCCS = CCS_SYSTEM; + cap->r.rCCCS = CCS_SYSTEM; #endif *pCap = cap; @@ -780,6 +843,8 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed) return rtsFalse; } +#endif /* THREADED_RTS */ + // Note [migrated bound threads] // // There's a tricky case where: @@ -815,6 +880,8 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed) * get every Capability into the GC. * ------------------------------------------------------------------------- */ +#if defined (THREADED_RTS) + void prodCapability (Capability *cap, Task *task) { @@ -826,6 +893,8 @@ prodCapability (Capability *cap, Task *task) RELEASE_LOCK(&cap->lock); } +#endif /* THREADED_RTS */ + /* ---------------------------------------------------------------------------- * tryGrabCapability * @@ -833,6 +902,8 @@ prodCapability (Capability *cap, Task *task) * * ------------------------------------------------------------------------- */ +#if defined (THREADED_RTS) + rtsBool tryGrabCapability (Capability *cap, Task *task) { |