diff options
Diffstat (limited to 'rts/Capability.c')
-rw-r--r-- | rts/Capability.c | 668 |
1 files changed, 668 insertions, 0 deletions
diff --git a/rts/Capability.c b/rts/Capability.c new file mode 100644 index 0000000000..51a42ef468 --- /dev/null +++ b/rts/Capability.c @@ -0,0 +1,668 @@ +/* --------------------------------------------------------------------------- + * + * (c) The GHC Team, 2003-2006 + * + * Capabilities + * + * A Capability represent the token required to execute STG code, + * and all the state an OS thread/task needs to run Haskell code: + * its STG registers, a pointer to its TSO, a nursery etc. During + * STG execution, a pointer to the capabilitity is kept in a + * register (BaseReg; actually it is a pointer to cap->r). + * + * Only in an THREADED_RTS build will there be multiple capabilities, + * for non-threaded builds there is only one global capability, namely + * MainCapability. + * + * --------------------------------------------------------------------------*/ + +#include "PosixSource.h" +#include "Rts.h" +#include "RtsUtils.h" +#include "RtsFlags.h" +#include "STM.h" +#include "OSThreads.h" +#include "Capability.h" +#include "Schedule.h" +#include "Sparks.h" + +// one global capability, this is the Capability for non-threaded +// builds, and for +RTS -N1 +Capability MainCapability; + +nat n_capabilities; +Capability *capabilities = NULL; + +// Holds the Capability which last became free. This is used so that +// an in-call has a chance of quickly finding a free Capability. +// Maintaining a global free list of Capabilities would require global +// locking, so we don't do that. +Capability *last_free_capability; + +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +globalWorkToDo (void) +{ + return blackholes_need_checking + || sched_state >= SCHED_INTERRUPTING + ; +} +#endif + +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +anyWorkForMe( Capability *cap, Task *task ) +{ + if (task->tso != NULL) { + // A bound task only runs if its thread is on the run queue of + // the capability on which it was woken up. Otherwise, we + // can't be sure that we have the right capability: the thread + // might be woken up on some other capability, and task->cap + // could change under our feet. + return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task; + } else { + // A vanilla worker task runs if either there is a lightweight + // thread at the head of the run queue, or the run queue is + // empty and (there are sparks to execute, or there is some + // other global condition to check, such as threads blocked on + // blackholes). + if (emptyRunQueue(cap)) { + return !emptySparkPoolCap(cap) + || !emptyWakeupQueue(cap) + || globalWorkToDo(); + } else + return cap->run_queue_hd->bound == NULL; + } +} +#endif + +/* ----------------------------------------------------------------------------- + * Manage the returning_tasks lists. + * + * These functions require cap->lock + * -------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +STATIC_INLINE void +newReturningTask (Capability *cap, Task *task) +{ + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->return_link == NULL); + if (cap->returning_tasks_hd) { + ASSERT(cap->returning_tasks_tl->return_link == NULL); + cap->returning_tasks_tl->return_link = task; + } else { + cap->returning_tasks_hd = task; + } + cap->returning_tasks_tl = task; +} + +STATIC_INLINE Task * +popReturningTask (Capability *cap) +{ + ASSERT_LOCK_HELD(&cap->lock); + Task *task; + task = cap->returning_tasks_hd; + ASSERT(task); + cap->returning_tasks_hd = task->return_link; + if (!cap->returning_tasks_hd) { + cap->returning_tasks_tl = NULL; + } + task->return_link = NULL; + return task; +} +#endif + +/* ---------------------------------------------------------------------------- + * Initialisation + * + * The Capability is initially marked not free. + * ------------------------------------------------------------------------- */ + +static void +initCapability( Capability *cap, nat i ) +{ + nat g; + + cap->no = i; + cap->in_haskell = rtsFalse; + + cap->run_queue_hd = END_TSO_QUEUE; + cap->run_queue_tl = END_TSO_QUEUE; + +#if defined(THREADED_RTS) + initMutex(&cap->lock); + cap->running_task = NULL; // indicates cap is free + cap->spare_workers = NULL; + cap->suspended_ccalling_tasks = NULL; + cap->returning_tasks_hd = NULL; + cap->returning_tasks_tl = NULL; + cap->wakeup_queue_hd = END_TSO_QUEUE; + cap->wakeup_queue_tl = END_TSO_QUEUE; +#endif + + cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1; + cap->f.stgGCFun = (F_)__stg_gc_fun; + + cap->mut_lists = stgMallocBytes(sizeof(bdescr *) * + RtsFlags.GcFlags.generations, + "initCapability"); + + for (g = 0; g < RtsFlags.GcFlags.generations; g++) { + cap->mut_lists[g] = NULL; + } + + cap->free_tvar_wait_queues = END_STM_WAIT_QUEUE; + cap->free_trec_chunks = END_STM_CHUNK_LIST; + cap->free_trec_headers = NO_TREC; + cap->transaction_tokens = 0; +} + +/* --------------------------------------------------------------------------- + * Function: initCapabilities() + * + * Purpose: set up the Capability handling. For the THREADED_RTS build, + * we keep a table of them, the size of which is + * controlled by the user via the RTS flag -N. + * + * ------------------------------------------------------------------------- */ +void +initCapabilities( void ) +{ +#if defined(THREADED_RTS) + nat i; + +#ifndef REG_Base + // We can't support multiple CPUs if BaseReg is not a register + if (RtsFlags.ParFlags.nNodes > 1) { + errorBelch("warning: multiple CPUs not supported in this build, reverting to 1"); + RtsFlags.ParFlags.nNodes = 1; + } +#endif + + n_capabilities = RtsFlags.ParFlags.nNodes; + + if (n_capabilities == 1) { + capabilities = &MainCapability; + // THREADED_RTS must work on builds that don't have a mutable + // BaseReg (eg. unregisterised), so in this case + // capabilities[0] must coincide with &MainCapability. + } else { + capabilities = stgMallocBytes(n_capabilities * sizeof(Capability), + "initCapabilities"); + } + + for (i = 0; i < n_capabilities; i++) { + initCapability(&capabilities[i], i); + } + + IF_DEBUG(scheduler, sched_belch("allocated %d capabilities", + n_capabilities)); + +#else /* !THREADED_RTS */ + + n_capabilities = 1; + capabilities = &MainCapability; + initCapability(&MainCapability, 0); + +#endif + + // There are no free capabilities to begin with. We will start + // a worker Task to each Capability, which will quickly put the + // Capability on the free list when it finds nothing to do. + last_free_capability = &capabilities[0]; +} + +/* ---------------------------------------------------------------------------- + * Give a Capability to a Task. The task must currently be sleeping + * on its condition variable. + * + * Requires cap->lock (modifies cap->running_task). + * + * When migrating a Task, the migrater must take task->lock before + * modifying task->cap, to synchronise with the waking up Task. + * Additionally, the migrater should own the Capability (when + * migrating the run queue), or cap->lock (when migrating + * returning_workers). + * + * ------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +STATIC_INLINE void +giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) +{ + ASSERT_LOCK_HELD(&cap->lock); + ASSERT(task->cap == cap); + IF_DEBUG(scheduler, + sched_belch("passing capability %d to %s %p", + cap->no, task->tso ? "bound task" : "worker", + (void *)task->id)); + ACQUIRE_LOCK(&task->lock); + task->wakeup = rtsTrue; + // the wakeup flag is needed because signalCondition() doesn't + // flag the condition if the thread is already runniing, but we want + // it to be sticky. + signalCondition(&task->cond); + RELEASE_LOCK(&task->lock); +} +#endif + +/* ---------------------------------------------------------------------------- + * Function: releaseCapability(Capability*) + * + * Purpose: Letting go of a capability. Causes a + * 'returning worker' thread or a 'waiting worker' + * to wake up, in that order. + * ------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +void +releaseCapability_ (Capability* cap) +{ + Task *task; + + task = cap->running_task; + + ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task); + + cap->running_task = NULL; + + // Check to see whether a worker thread can be given + // the go-ahead to return the result of an external call.. + if (cap->returning_tasks_hd != NULL) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + // The Task pops itself from the queue (see waitForReturnCapability()) + return; + } + + // If the next thread on the run queue is a bound thread, + // give this Capability to the appropriate Task. + if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) { + // Make sure we're not about to try to wake ourselves up + ASSERT(task != cap->run_queue_hd->bound); + task = cap->run_queue_hd->bound; + giveCapabilityToTask(cap,task); + return; + } + + if (!cap->spare_workers) { + // Create a worker thread if we don't have one. If the system + // is interrupted, we only create a worker task if there + // are threads that need to be completed. If the system is + // shutting down, we never create a new worker. + if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { + IF_DEBUG(scheduler, + sched_belch("starting new worker on capability %d", cap->no)); + startWorkerTask(cap, workerStart); + return; + } + } + + // If we have an unbound thread on the run queue, or if there's + // anything else to do, give the Capability to a worker thread. + if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap) + || !emptySparkPoolCap(cap) || globalWorkToDo()) { + if (cap->spare_workers) { + giveCapabilityToTask(cap,cap->spare_workers); + // The worker Task pops itself from the queue; + return; + } + } + + last_free_capability = cap; + IF_DEBUG(scheduler, sched_belch("freeing capability %d", cap->no)); +} + +void +releaseCapability (Capability* cap USED_IF_THREADS) +{ + ACQUIRE_LOCK(&cap->lock); + releaseCapability_(cap); + RELEASE_LOCK(&cap->lock); +} + +static void +releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) +{ + Task *task; + + ACQUIRE_LOCK(&cap->lock); + + task = cap->running_task; + + // If the current task is a worker, save it on the spare_workers + // list of this Capability. A worker can mark itself as stopped, + // in which case it is not replaced on the spare_worker queue. + // This happens when the system is shutting down (see + // Schedule.c:workerStart()). + // Also, be careful to check that this task hasn't just exited + // Haskell to do a foreign call (task->suspended_tso). + if (!isBoundTask(task) && !task->stopped && !task->suspended_tso) { + task->next = cap->spare_workers; + cap->spare_workers = task; + } + // Bound tasks just float around attached to their TSOs. + + releaseCapability_(cap); + + RELEASE_LOCK(&cap->lock); +} +#endif + +/* ---------------------------------------------------------------------------- + * waitForReturnCapability( Task *task ) + * + * Purpose: when an OS thread returns from an external call, + * it calls waitForReturnCapability() (via Schedule.resumeThread()) + * to wait for permission to enter the RTS & communicate the + * result of the external call back to the Haskell thread that + * made it. + * + * ------------------------------------------------------------------------- */ +void +waitForReturnCapability (Capability **pCap, Task *task) +{ +#if !defined(THREADED_RTS) + + MainCapability.running_task = task; + task->cap = &MainCapability; + *pCap = &MainCapability; + +#else + Capability *cap = *pCap; + + if (cap == NULL) { + // Try last_free_capability first + cap = last_free_capability; + if (!cap->running_task) { + nat i; + // otherwise, search for a free capability + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + if (!cap->running_task) { + break; + } + } + // Can't find a free one, use last_free_capability. + cap = last_free_capability; + } + + // record the Capability as the one this Task is now assocated with. + task->cap = cap; + + } else { + ASSERT(task->cap == cap); + } + + ACQUIRE_LOCK(&cap->lock); + + IF_DEBUG(scheduler, + sched_belch("returning; I want capability %d", cap->no)); + + if (!cap->running_task) { + // It's free; just grab it + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + } else { + newReturningTask(cap,task); + RELEASE_LOCK(&cap->lock); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + // now check whether we should wake up... + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task == NULL) { + if (cap->returning_tasks_hd != task) { + giveCapabilityToTask(cap,cap->returning_tasks_hd); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->running_task = task; + popReturningTask(cap); + RELEASE_LOCK(&cap->lock); + break; + } + RELEASE_LOCK(&cap->lock); + } + + } + + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + + IF_DEBUG(scheduler, + sched_belch("returning; got capability %d", cap->no)); + + *pCap = cap; +#endif +} + +#if defined(THREADED_RTS) +/* ---------------------------------------------------------------------------- + * yieldCapability + * ------------------------------------------------------------------------- */ + +void +yieldCapability (Capability** pCap, Task *task) +{ + Capability *cap = *pCap; + + // The fast path has no locking, if we don't enter this while loop + + while ( cap->returning_tasks_hd != NULL || !anyWorkForMe(cap,task) ) { + IF_DEBUG(scheduler, sched_belch("giving up capability %d", cap->no)); + + // We must now release the capability and wait to be woken up + // again. + task->wakeup = rtsFalse; + releaseCapabilityAndQueueWorker(cap); + + for (;;) { + ACQUIRE_LOCK(&task->lock); + // task->lock held, cap->lock not held + if (!task->wakeup) waitCondition(&task->cond, &task->lock); + cap = task->cap; + task->wakeup = rtsFalse; + RELEASE_LOCK(&task->lock); + + IF_DEBUG(scheduler, sched_belch("woken up on capability %d", cap->no)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task != NULL) { + IF_DEBUG(scheduler, sched_belch("capability %d is owned by another task", cap->no)); + RELEASE_LOCK(&cap->lock); + continue; + } + + if (task->tso == NULL) { + ASSERT(cap->spare_workers != NULL); + // if we're not at the front of the queue, release it + // again. This is unlikely to happen. + if (cap->spare_workers != task) { + giveCapabilityToTask(cap,cap->spare_workers); + RELEASE_LOCK(&cap->lock); + continue; + } + cap->spare_workers = task->next; + task->next = NULL; + } + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + break; + } + + IF_DEBUG(scheduler, sched_belch("got capability %d", cap->no)); + ASSERT(cap->running_task == task); + } + + *pCap = cap; + + ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + + return; +} + +/* ---------------------------------------------------------------------------- + * Wake up a thread on a Capability. + * + * This is used when the current Task is running on a Capability and + * wishes to wake up a thread on a different Capability. + * ------------------------------------------------------------------------- */ + +void +wakeupThreadOnCapability (Capability *cap, StgTSO *tso) +{ + ASSERT(tso->cap == cap); + ASSERT(tso->bound ? tso->bound->cap == cap : 1); + + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task == NULL) { + // nobody is running this Capability, we can add our thread + // directly onto the run queue and start up a Task to run it. + appendToRunQueue(cap,tso); + + // start it up + cap->running_task = myTask(); // precond for releaseCapability_() + releaseCapability_(cap); + } else { + appendToWakeupQueue(cap,tso); + // someone is running on this Capability, so it cannot be + // freed without first checking the wakeup queue (see + // releaseCapability_). + } + RELEASE_LOCK(&cap->lock); +} + +/* ---------------------------------------------------------------------------- + * prodCapabilities + * + * Used to indicate that the interrupted flag is now set, or some + * other global condition that might require waking up a Task on each + * Capability. + * ------------------------------------------------------------------------- */ + +static void +prodCapabilities(rtsBool all) +{ + nat i; + Capability *cap; + Task *task; + + for (i=0; i < n_capabilities; i++) { + cap = &capabilities[i]; + ACQUIRE_LOCK(&cap->lock); + if (!cap->running_task) { + if (cap->spare_workers) { + task = cap->spare_workers; + ASSERT(!task->stopped); + giveCapabilityToTask(cap,task); + if (!all) { + RELEASE_LOCK(&cap->lock); + return; + } + } + } + RELEASE_LOCK(&cap->lock); + } + return; +} + +void +prodAllCapabilities (void) +{ + prodCapabilities(rtsTrue); +} + +/* ---------------------------------------------------------------------------- + * prodOneCapability + * + * Like prodAllCapabilities, but we only require a single Task to wake + * up in order to service some global event, such as checking for + * deadlock after some idle time has passed. + * ------------------------------------------------------------------------- */ + +void +prodOneCapability (void) +{ + prodCapabilities(rtsFalse); +} + +/* ---------------------------------------------------------------------------- + * shutdownCapability + * + * At shutdown time, we want to let everything exit as cleanly as + * possible. For each capability, we let its run queue drain, and + * allow the workers to stop. + * + * This function should be called when interrupted and + * shutting_down_scheduler = rtsTrue, thus any worker that wakes up + * will exit the scheduler and call taskStop(), and any bound thread + * that wakes up will return to its caller. Runnable threads are + * killed. + * + * ------------------------------------------------------------------------- */ + +void +shutdownCapability (Capability *cap, Task *task) +{ + nat i; + + ASSERT(sched_state == SCHED_SHUTTING_DOWN); + + task->cap = cap; + + for (i = 0; i < 50; i++) { + IF_DEBUG(scheduler, sched_belch("shutting down capability %d, attempt %d", cap->no, i)); + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task) { + RELEASE_LOCK(&cap->lock); + IF_DEBUG(scheduler, sched_belch("not owner, yielding")); + yieldThread(); + continue; + } + cap->running_task = task; + if (!emptyRunQueue(cap) || cap->spare_workers) { + IF_DEBUG(scheduler, sched_belch("runnable threads or workers still alive, yielding")); + releaseCapability_(cap); // this will wake up a worker + RELEASE_LOCK(&cap->lock); + yieldThread(); + continue; + } + IF_DEBUG(scheduler, sched_belch("capability %d is stopped.", cap->no)); + RELEASE_LOCK(&cap->lock); + break; + } + // we now have the Capability, its run queue and spare workers + // list are both empty. +} + +/* ---------------------------------------------------------------------------- + * tryGrabCapability + * + * Attempt to gain control of a Capability if it is free. + * + * ------------------------------------------------------------------------- */ + +rtsBool +tryGrabCapability (Capability *cap, Task *task) +{ + if (cap->running_task != NULL) return rtsFalse; + ACQUIRE_LOCK(&cap->lock); + if (cap->running_task != NULL) { + RELEASE_LOCK(&cap->lock); + return rtsFalse; + } + task->cap = cap; + cap->running_task = task; + RELEASE_LOCK(&cap->lock); + return rtsTrue; +} + + +#endif /* THREADED_RTS */ + + |