/* --------------------------------------------------------------------------- * * (c) The GHC Team, 2003-2012 * * Capabilities * * A Capability represents the token required to execute STG code, * and all the state an OS thread/task needs to run Haskell code: * its STG registers, a pointer to its TSO, a nursery etc. During * STG execution, a pointer to the capabilitity is kept in a * register (BaseReg; actually it is a pointer to cap->r). * * Only in an THREADED_RTS build will there be multiple capabilities, * for non-threaded builds there is only one global capability, namely * MainCapability. * * --------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" #include "Capability.h" #include "Schedule.h" #include "Sparks.h" #include "Trace.h" #include "sm/GC.h" // for gcWorkerThread() #include "STM.h" #include "RtsUtils.h" #include "sm/OSMem.h" #if !defined(mingw32_HOST_OS) #include "rts/IOManager.h" // for setIOManagerControlFd() #endif #include // one global capability, this is the Capability for non-threaded // builds, and for +RTS -N1 Capability MainCapability; uint32_t n_capabilities = 0; uint32_t enabled_capabilities = 0; // The array of Capabilities. It's important that when we need // to allocate more Capabilities we don't have to move the existing // Capabilities, because there may be pointers to them in use // (e.g. threads in waitForCapability(), see #8209), so this is // an array of Capability* rather than an array of Capability. Capability **capabilities = NULL; // Holds the Capability which last became free. This is used so that // an in-call has a chance of quickly finding a free Capability. // Maintaining a global free list of Capabilities would require global // locking, so we don't do that. static Capability *last_free_capability[MAX_NUMA_NODES]; /* * Indicates that the RTS wants to synchronise all the Capabilities * for some reason. All Capabilities should yieldCapability(). */ PendingSync * volatile pending_sync = 0; // Number of logical NUMA nodes uint32_t n_numa_nodes; // Map logical NUMA node to OS node numbers uint32_t numa_map[MAX_NUMA_NODES]; /* Let foreign code get the current Capability -- assuming there is one! * This is useful for unsafe foreign calls because they are called with * the current Capability held, but they are not passed it. For example, * see see the integer-gmp package which calls allocate() in its * stgAllocForGMP() function (which gets called by gmp functions). * */ Capability * rts_unsafeGetMyCapability (void) { #if defined(THREADED_RTS) return myTask()->cap; #else return &MainCapability; #endif } #if defined(THREADED_RTS) STATIC_INLINE bool globalWorkToDo (void) { return sched_state >= SCHED_INTERRUPTING || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock } #endif #if defined(THREADED_RTS) StgClosure * findSpark (Capability *cap) { Capability *robbed; StgClosurePtr spark; bool retry; uint32_t i = 0; if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) { // If there are other threads, don't try to run any new // sparks: sparks might be speculative, we don't want to take // resources away from the main computation. return 0; } do { retry = false; // first try to get a spark from our own pool. // We should be using reclaimSpark(), because it works without // needing any atomic instructions: // spark = reclaimSpark(cap->sparks); // However, measurements show that this makes at least one benchmark // slower (prsa) and doesn't affect the others. spark = tryStealSpark(cap->sparks); while (spark != NULL && fizzledSpark(spark)) { cap->spark_stats.fizzled++; traceEventSparkFizzle(cap); spark = tryStealSpark(cap->sparks); } if (spark != NULL) { cap->spark_stats.converted++; // Post event for running a spark from capability's own pool. traceEventSparkRun(cap); return spark; } if (!emptySparkPoolCap(cap)) { retry = true; } if (n_capabilities == 1) { return NULL; } // makes no sense... debugTrace(DEBUG_sched, "cap %d: Trying to steal work from other capabilities", cap->no); /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could start at a random place instead of 0 as well. */ for ( i=0 ; i < n_capabilities ; i++ ) { robbed = capabilities[i]; if (cap == robbed) // ourselves... continue; if (emptySparkPoolCap(robbed)) // nothing to steal here continue; spark = tryStealSpark(robbed->sparks); while (spark != NULL && fizzledSpark(spark)) { cap->spark_stats.fizzled++; traceEventSparkFizzle(cap); spark = tryStealSpark(robbed->sparks); } if (spark == NULL && !emptySparkPoolCap(robbed)) { // we conflicted with another thread while trying to steal; // try again later. retry = true; } if (spark != NULL) { cap->spark_stats.converted++; traceEventSparkSteal(cap, robbed->no); return spark; } // otherwise: no success, try next one } } while (retry); debugTrace(DEBUG_sched, "No sparks stolen"); return NULL; } // Returns True if any spark pool is non-empty at this moment in time // The result is only valid for an instant, of course, so in a sense // is immediately invalid, and should not be relied upon for // correctness. bool anySparks (void) { uint32_t i; for (i=0; i < n_capabilities; i++) { if (!emptySparkPoolCap(capabilities[i])) { return true; } } return false; } #endif /* ----------------------------------------------------------------------------- * Manage the returning_tasks lists. * * These functions require cap->lock * -------------------------------------------------------------------------- */ #if defined(THREADED_RTS) STATIC_INLINE void newReturningTask (Capability *cap, Task *task) { ASSERT_LOCK_HELD(&cap->lock); ASSERT(task->next == NULL); if (cap->returning_tasks_hd) { ASSERT(cap->returning_tasks_tl->next == NULL); cap->returning_tasks_tl->next = task; } else { cap->returning_tasks_hd = task; } cap->returning_tasks_tl = task; cap->n_returning_tasks++; ASSERT_RETURNING_TASKS(cap,task); } STATIC_INLINE Task * popReturningTask (Capability *cap) { ASSERT_LOCK_HELD(&cap->lock); Task *task; task = cap->returning_tasks_hd; ASSERT(task); cap->returning_tasks_hd = task->next; if (!cap->returning_tasks_hd) { cap->returning_tasks_tl = NULL; } task->next = NULL; cap->n_returning_tasks--; ASSERT_RETURNING_TASKS(cap,task); return task; } #endif /* ---------------------------------------------------------------------------- * Initialisation * * The Capability is initially marked not free. * ------------------------------------------------------------------------- */ static void initCapability (Capability *cap, uint32_t i) { uint32_t g; cap->no = i; cap->node = capNoToNumaNode(i); cap->in_haskell = false; cap->idle = 0; cap->disabled = false; cap->run_queue_hd = END_TSO_QUEUE; cap->run_queue_tl = END_TSO_QUEUE; cap->n_run_queue = 0; #if defined(THREADED_RTS) initMutex(&cap->lock); cap->running_task = NULL; // indicates cap is free cap->spare_workers = NULL; cap->n_spare_workers = 0; cap->suspended_ccalls = NULL; cap->n_suspended_ccalls = 0; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; cap->n_returning_tasks = 0; cap->inbox = (Message*)END_TSO_QUEUE; cap->putMVars = NULL; cap->sparks = allocSparkPool(); cap->spark_stats.created = 0; cap->spark_stats.dud = 0; cap->spark_stats.overflowed = 0; cap->spark_stats.converted = 0; cap->spark_stats.gcd = 0; cap->spark_stats.fizzled = 0; #if !defined(mingw32_HOST_OS) cap->io_manager_control_wr_fd = -1; #endif #endif cap->total_allocated = 0; cap->f.stgEagerBlackholeInfo = (W_)&__stg_EAGER_BLACKHOLE_info; cap->f.stgGCEnter1 = (StgFunPtr)__stg_gc_enter_1; cap->f.stgGCFun = (StgFunPtr)__stg_gc_fun; cap->mut_lists = stgMallocBytes(sizeof(bdescr *) * RtsFlags.GcFlags.generations, "initCapability"); cap->saved_mut_lists = stgMallocBytes(sizeof(bdescr *) * RtsFlags.GcFlags.generations, "initCapability"); for (g = 0; g < RtsFlags.GcFlags.generations; g++) { cap->mut_lists[g] = NULL; } cap->weak_ptr_list_hd = NULL; cap->weak_ptr_list_tl = NULL; cap->free_tvar_watch_queues = END_STM_WATCH_QUEUE; cap->free_invariant_check_queues = END_INVARIANT_CHECK_QUEUE; cap->free_trec_chunks = END_STM_CHUNK_LIST; cap->free_trec_headers = NO_TREC; cap->transaction_tokens = 0; cap->context_switch = 0; cap->pinned_object_block = NULL; cap->pinned_object_blocks = NULL; #ifdef PROFILING cap->r.rCCCS = CCS_SYSTEM; #else cap->r.rCCCS = NULL; #endif // cap->r.rCurrentTSO is charged for calls to allocate(), so we // don't want it set when not running a Haskell thread. cap->r.rCurrentTSO = NULL; traceCapCreate(cap); traceCapsetAssignCap(CAPSET_OSPROCESS_DEFAULT, i); traceCapsetAssignCap(CAPSET_CLOCKDOMAIN_DEFAULT, i); #if defined(THREADED_RTS) traceSparkCounters(cap); #endif } /* --------------------------------------------------------------------------- * Function: initCapabilities() * * Purpose: set up the Capability handling. For the THREADED_RTS build, * we keep a table of them, the size of which is * controlled by the user via the RTS flag -N. * * ------------------------------------------------------------------------- */ void initCapabilities (void) { uint32_t i; /* Declare a couple capability sets representing the process and clock domain. Each capability will get added to these capsets. */ traceCapsetCreate(CAPSET_OSPROCESS_DEFAULT, CapsetTypeOsProcess); traceCapsetCreate(CAPSET_CLOCKDOMAIN_DEFAULT, CapsetTypeClockdomain); // Initialise NUMA if (!RtsFlags.GcFlags.numa) { n_numa_nodes = 1; for (i = 0; i < MAX_NUMA_NODES; i++) { numa_map[i] = 0; } } else { uint32_t nNodes = osNumaNodes(); if (nNodes > MAX_NUMA_NODES) { barf("Too many NUMA nodes (max %d)", MAX_NUMA_NODES); } StgWord mask = RtsFlags.GcFlags.numaMask & osNumaMask(); uint32_t logical = 0, physical = 0; for (; physical < MAX_NUMA_NODES; physical++) { if (mask & 1) { numa_map[logical++] = physical; } mask = mask >> 1; } n_numa_nodes = logical; if (logical == 0) { barf("%s: available NUMA node set is empty"); } } #if defined(THREADED_RTS) #ifndef REG_Base // We can't support multiple CPUs if BaseReg is not a register if (RtsFlags.ParFlags.nCapabilities > 1) { errorBelch("warning: multiple CPUs not supported in this build, reverting to 1"); RtsFlags.ParFlags.nCapabilities = 1; } #endif n_capabilities = 0; moreCapabilities(0, RtsFlags.ParFlags.nCapabilities); n_capabilities = RtsFlags.ParFlags.nCapabilities; #else /* !THREADED_RTS */ n_capabilities = 1; capabilities = stgMallocBytes(sizeof(Capability*), "initCapabilities"); capabilities[0] = &MainCapability; initCapability(&MainCapability, 0); #endif enabled_capabilities = n_capabilities; // There are no free capabilities to begin with. We will start // a worker Task to each Capability, which will quickly put the // Capability on the free list when it finds nothing to do. for (i = 0; i < n_numa_nodes; i++) { last_free_capability[i] = capabilities[0]; } } void moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS) { #if defined(THREADED_RTS) uint32_t i; Capability **old_capabilities = capabilities; capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities"); if (to == 1) { // THREADED_RTS must work on builds that don't have a mutable // BaseReg (eg. unregisterised), so in this case // capabilities[0] must coincide with &MainCapability. capabilities[0] = &MainCapability; initCapability(&MainCapability, 0); } else { for (i = 0; i < to; i++) { if (i < from) { capabilities[i] = old_capabilities[i]; } else { capabilities[i] = stgMallocBytes(sizeof(Capability), "moreCapabilities"); initCapability(capabilities[i], i); } } } debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from); if (old_capabilities != NULL) { stgFree(old_capabilities); } #endif } /* ---------------------------------------------------------------------------- * setContextSwitches: cause all capabilities to context switch as * soon as possible. * ------------------------------------------------------------------------- */ void contextSwitchAllCapabilities(void) { uint32_t i; for (i=0; i < n_capabilities; i++) { contextSwitchCapability(capabilities[i]); } } void interruptAllCapabilities(void) { uint32_t i; for (i=0; i < n_capabilities; i++) { interruptCapability(capabilities[i]); } } /* ---------------------------------------------------------------------------- * Give a Capability to a Task. The task must currently be sleeping * on its condition variable. * * Requires cap->lock (modifies cap->running_task). * * When migrating a Task, the migrater must take task->lock before * modifying task->cap, to synchronise with the waking up Task. * Additionally, the migrater should own the Capability (when * migrating the run queue), or cap->lock (when migrating * returning_workers). * * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) static void giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) { ASSERT_LOCK_HELD(&cap->lock); ASSERT(task->cap == cap); debugTrace(DEBUG_sched, "passing capability %d to %s %#" FMT_HexWord64, cap->no, task->incall->tso ? "bound task" : "worker", serialisableTaskId(task)); ACQUIRE_LOCK(&task->lock); if (task->wakeup == false) { task->wakeup = true; // the wakeup flag is needed because signalCondition() doesn't // flag the condition if the thread is already runniing, but we want // it to be sticky. signalCondition(&task->cond); } RELEASE_LOCK(&task->lock); } #endif /* ---------------------------------------------------------------------------- * releaseCapability * * The current Task (cap->task) releases the Capability. The Capability is * marked free, and if there is any work to do, an appropriate Task is woken up. * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) void releaseCapability_ (Capability* cap, bool always_wakeup) { Task *task; task = cap->running_task; ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task); ASSERT_RETURNING_TASKS(cap,task); cap->running_task = NULL; // Check to see whether a worker thread can be given // the go-ahead to return the result of an external call.. if (cap->n_returning_tasks != 0) { giveCapabilityToTask(cap,cap->returning_tasks_hd); // The Task pops itself from the queue (see waitForCapability()) return; } // If there is a pending sync, then we should just leave the Capability // free. The thread trying to sync will be about to call // waitForCapability(). // // Note: this is *after* we check for a returning task above, // because the task attempting to acquire all the capabilities may // be currently in waitForCapability() waiting for this // capability, in which case simply setting it as free would not // wake up the waiting task. PendingSync *sync = pending_sync; if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) { debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no); return; } // If the next thread on the run queue is a bound thread, // give this Capability to the appropriate Task. if (!emptyRunQueue(cap) && peekRunQueue(cap)->bound) { // Make sure we're not about to try to wake ourselves up // ASSERT(task != cap->run_queue_hd->bound); // assertion is false: in schedule() we force a yield after // ThreadBlocked, but the thread may be back on the run queue // by now. task = peekRunQueue(cap)->bound->task; giveCapabilityToTask(cap, task); return; } if (!cap->spare_workers) { // Create a worker thread if we don't have one. If the system // is interrupted, we only create a worker task if there // are threads that need to be completed. If the system is // shutting down, we never create a new worker. if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { debugTrace(DEBUG_sched, "starting new worker on capability %d", cap->no); startWorkerTask(cap); return; } } // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. if (always_wakeup || !emptyRunQueue(cap) || !emptyInbox(cap) || (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap, cap->spare_workers); // The worker Task pops itself from the queue; return; } } #ifdef PROFILING cap->r.rCCCS = CCS_IDLE; #endif last_free_capability[cap->node] = cap; debugTrace(DEBUG_sched, "freeing capability %d", cap->no); } void releaseCapability (Capability* cap USED_IF_THREADS) { ACQUIRE_LOCK(&cap->lock); releaseCapability_(cap, false); RELEASE_LOCK(&cap->lock); } void releaseAndWakeupCapability (Capability* cap USED_IF_THREADS) { ACQUIRE_LOCK(&cap->lock); releaseCapability_(cap, true); RELEASE_LOCK(&cap->lock); } static void enqueueWorker (Capability* cap USED_IF_THREADS) { Task *task; task = cap->running_task; // If the Task is stopped, we shouldn't be yielding, we should // be just exiting. ASSERT(!task->stopped); ASSERT(task->worker); if (cap->n_spare_workers < MAX_SPARE_WORKERS) { task->next = cap->spare_workers; cap->spare_workers = task; cap->n_spare_workers++; } else { debugTrace(DEBUG_sched, "%d spare workers already, exiting", cap->n_spare_workers); releaseCapability_(cap,false); // hold the lock until after workerTaskStop; c.f. scheduleWorker() workerTaskStop(task); RELEASE_LOCK(&cap->lock); shutdownThread(); } } #endif /* ---------------------------------------------------------------------------- * waitForWorkerCapability(task) * * waits to be given a Capability, and then returns the Capability. The task * must be either a worker (and on a cap->spare_workers queue), or a bound Task. * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) static Capability * waitForWorkerCapability (Task *task) { Capability *cap; for (;;) { ACQUIRE_LOCK(&task->lock); // task->lock held, cap->lock not held if (!task->wakeup) waitCondition(&task->cond, &task->lock); cap = task->cap; task->wakeup = false; RELEASE_LOCK(&task->lock); debugTrace(DEBUG_sched, "woken up on capability %d", cap->no); ACQUIRE_LOCK(&cap->lock); if (cap->running_task != NULL) { debugTrace(DEBUG_sched, "capability %d is owned by another task", cap->no); RELEASE_LOCK(&cap->lock); continue; } if (task->cap != cap) { // see Note [migrated bound threads] debugTrace(DEBUG_sched, "task has been migrated to cap %d", task->cap->no); RELEASE_LOCK(&cap->lock); continue; } if (task->incall->tso == NULL) { ASSERT(cap->spare_workers != NULL); // if we're not at the front of the queue, release it // again. This is unlikely to happen. if (cap->spare_workers != task) { giveCapabilityToTask(cap,cap->spare_workers); RELEASE_LOCK(&cap->lock); continue; } cap->spare_workers = task->next; task->next = NULL; cap->n_spare_workers--; } cap->running_task = task; RELEASE_LOCK(&cap->lock); break; } return cap; } #endif /* THREADED_RTS */ /* ---------------------------------------------------------------------------- * waitForReturnCapability (Task *task) * * The Task should be on the cap->returning_tasks queue of a Capability. This * function waits for the Task to be woken up, and returns the Capability that * it was woken up on. * * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) static Capability * waitForReturnCapability (Task *task) { Capability *cap; for (;;) { ACQUIRE_LOCK(&task->lock); // task->lock held, cap->lock not held if (!task->wakeup) waitCondition(&task->cond, &task->lock); cap = task->cap; task->wakeup = false; RELEASE_LOCK(&task->lock); // now check whether we should wake up... ACQUIRE_LOCK(&cap->lock); if (cap->running_task == NULL) { if (cap->returning_tasks_hd != task) { giveCapabilityToTask(cap,cap->returning_tasks_hd); RELEASE_LOCK(&cap->lock); continue; } cap->running_task = task; popReturningTask(cap); RELEASE_LOCK(&cap->lock); break; } RELEASE_LOCK(&cap->lock); } return cap; } #endif /* THREADED_RTS */ /* ---------------------------------------------------------------------------- * waitForCapability (Capability **pCap, Task *task) * * Purpose: when an OS thread returns from an external call, * it calls waitForCapability() (via Schedule.resumeThread()) * to wait for permission to enter the RTS & communicate the * result of the external call back to the Haskell thread that * made it. * * ------------------------------------------------------------------------- */ void waitForCapability (Capability **pCap, Task *task) { #if !defined(THREADED_RTS) MainCapability.running_task = task; task->cap = &MainCapability; *pCap = &MainCapability; #else uint32_t i; Capability *cap = *pCap; if (cap == NULL) { if (task->preferred_capability != -1) { cap = capabilities[task->preferred_capability % enabled_capabilities]; } else { // Try last_free_capability first cap = last_free_capability[task->node]; if (cap->running_task) { // Otherwise, search for a free capability on this node. cap = NULL; for (i = task->node; i < enabled_capabilities; i += n_numa_nodes) { // visits all the capabilities on this node, because // cap[i]->node == i % n_numa_nodes if (!capabilities[i]->running_task) { cap = capabilities[i]; break; } } if (cap == NULL) { // Can't find a free one, use last_free_capability. cap = last_free_capability[task->node]; } } } // record the Capability as the one this Task is now assocated with. task->cap = cap; } else { ASSERT(task->cap == cap); } debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no); ACQUIRE_LOCK(&cap->lock); if (!cap->running_task) { // It's free; just grab it cap->running_task = task; RELEASE_LOCK(&cap->lock); } else { newReturningTask(cap,task); RELEASE_LOCK(&cap->lock); cap = waitForReturnCapability(task); } #ifdef PROFILING cap->r.rCCCS = CCS_SYSTEM; #endif ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task); debugTrace(DEBUG_sched, "resuming capability %d", cap->no); *pCap = cap; #endif } /* ---------------------------------------------------------------------------- * yieldCapability * * Give up the Capability, and return when we have it again. This is called * when either we know that the Capability should be given to another Task, or * there is nothing to do right now. One of the following is true: * * - The current Task is a worker, and there's a bound thread at the head of * the run queue (or vice versa) * * - The run queue is empty. We'll be woken up again when there's work to * do. * * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR). * We should become a GC worker for a while. * * - Another Task is trying to acquire all the Capabilities (pending_sync != * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or * setNumCapabilities. We should give up the Capability temporarily. * * ------------------------------------------------------------------------- */ #if defined (THREADED_RTS) /* See Note [GC livelock] in Schedule.c for why we have gcAllowed and return the bool */ bool /* Did we GC? */ yieldCapability (Capability** pCap, Task *task, bool gcAllowed) { Capability *cap = *pCap; if (gcAllowed) { PendingSync *sync = pending_sync; if (sync && sync->type == SYNC_GC_PAR) { if (! sync->idle[cap->no]) { traceEventGcStart(cap); gcWorkerThread(cap); traceEventGcEnd(cap); traceSparkCounters(cap); // See Note [migrated bound threads 2] if (task->cap == cap) { return true; } } } } debugTrace(DEBUG_sched, "giving up capability %d", cap->no); // We must now release the capability and wait to be woken up again. task->wakeup = false; ACQUIRE_LOCK(&cap->lock); // If this is a worker thread, put it on the spare_workers queue if (isWorker(task)) { enqueueWorker(cap); } releaseCapability_(cap, false); if (isWorker(task) || isBoundTask(task)) { RELEASE_LOCK(&cap->lock); cap = waitForWorkerCapability(task); } else { // Not a worker Task, or a bound Task. The only way we can be woken up // again is to put ourselves on the returning_tasks queue, so that's // what we do. We still hold cap->lock at this point // The Task waiting for this Capability does not have it // yet, so we can be sure to be woken up later. (see #10545) newReturningTask(cap,task); RELEASE_LOCK(&cap->lock); cap = waitForReturnCapability(task); } debugTrace(DEBUG_sched, "resuming capability %d", cap->no); ASSERT(cap->running_task == task); #ifdef PROFILING cap->r.rCCCS = CCS_SYSTEM; #endif *pCap = cap; ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); return false; } #endif /* THREADED_RTS */ // Note [migrated bound threads] // // There's a tricky case where: // - cap A is running an unbound thread T1 // - there is a bound thread T2 at the head of the run queue on cap A // - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A // - T1 returns quickly grabbing A again (T2 is still waking up on A) // - T1 blocks, the scheduler migrates T2 to cap B // - the task bound to T2 wakes up on cap B // // We take advantage of the following invariant: // // - A bound thread can only be migrated by the holder of the // Capability on which the bound thread currently lives. So, if we // hold Capability C, and task->cap == C, then task cannot be // migrated under our feet. // Note [migrated bound threads 2] // // Second tricky case; // - A bound Task becomes a GC thread // - scheduleDoGC() migrates the thread belonging to this Task, // because the Capability it is on is disabled // - after GC, gcWorkerThread() returns, but now we are // holding a Capability that is not the same as task->cap // - Hence we must check for this case and immediately give up the // cap we hold. /* ---------------------------------------------------------------------------- * prodCapability * * If a Capability is currently idle, wake up a Task on it. Used to * get every Capability into the GC. * ------------------------------------------------------------------------- */ #if defined (THREADED_RTS) void prodCapability (Capability *cap, Task *task) { ACQUIRE_LOCK(&cap->lock); if (!cap->running_task) { cap->running_task = task; releaseCapability_(cap,true); } RELEASE_LOCK(&cap->lock); } #endif /* THREADED_RTS */ /* ---------------------------------------------------------------------------- * tryGrabCapability * * Attempt to gain control of a Capability if it is free. * * ------------------------------------------------------------------------- */ #if defined (THREADED_RTS) bool tryGrabCapability (Capability *cap, Task *task) { int r; if (cap->running_task != NULL) return false; r = TRY_ACQUIRE_LOCK(&cap->lock); if (r != 0) return false; if (cap->running_task != NULL) { RELEASE_LOCK(&cap->lock); return false; } task->cap = cap; cap->running_task = task; RELEASE_LOCK(&cap->lock); return true; } #endif /* THREADED_RTS */ /* ---------------------------------------------------------------------------- * shutdownCapability * * At shutdown time, we want to let everything exit as cleanly as * possible. For each capability, we let its run queue drain, and * allow the workers to stop. * * This function should be called when interrupted and * sched_state = SCHED_SHUTTING_DOWN, thus any worker that wakes up * will exit the scheduler and call taskStop(), and any bound thread * that wakes up will return to its caller. Runnable threads are * killed. * * ------------------------------------------------------------------------- */ static void shutdownCapability (Capability *cap USED_IF_THREADS, Task *task USED_IF_THREADS, bool safe USED_IF_THREADS) { #if defined(THREADED_RTS) uint32_t i; task->cap = cap; // Loop indefinitely until all the workers have exited and there // are no Haskell threads left. We used to bail out after 50 // iterations of this loop, but that occasionally left a worker // running which caused problems later (the closeMutex() below // isn't safe, for one thing). for (i = 0; /* i < 50 */; i++) { ASSERT(sched_state == SCHED_SHUTTING_DOWN); debugTrace(DEBUG_sched, "shutting down capability %d, attempt %d", cap->no, i); ACQUIRE_LOCK(&cap->lock); if (cap->running_task) { RELEASE_LOCK(&cap->lock); debugTrace(DEBUG_sched, "not owner, yielding"); yieldThread(); continue; } cap->running_task = task; if (cap->spare_workers) { // Look for workers that have died without removing // themselves from the list; this could happen if the OS // summarily killed the thread, for example. This // actually happens on Windows when the system is // terminating the program, and the RTS is running in a // DLL. Task *t, *prev; prev = NULL; for (t = cap->spare_workers; t != NULL; t = t->next) { if (!osThreadIsAlive(t->id)) { debugTrace(DEBUG_sched, "worker thread %p has died unexpectedly", (void *)(size_t)t->id); cap->n_spare_workers--; if (!prev) { cap->spare_workers = t->next; } else { prev->next = t->next; } prev = t; } } } if (!emptyRunQueue(cap) || cap->spare_workers) { debugTrace(DEBUG_sched, "runnable threads or workers still alive, yielding"); releaseCapability_(cap,false); // this will wake up a worker RELEASE_LOCK(&cap->lock); yieldThread(); continue; } // If "safe", then busy-wait for any threads currently doing // foreign calls. If we're about to unload this DLL, for // example, we need to be sure that there are no OS threads // that will try to return to code that has been unloaded. // We can be a bit more relaxed when this is a standalone // program that is about to terminate, and let safe=false. if (cap->suspended_ccalls && safe) { debugTrace(DEBUG_sched, "thread(s) are involved in foreign calls, yielding"); cap->running_task = NULL; RELEASE_LOCK(&cap->lock); // The IO manager thread might have been slow to start up, // so the first attempt to kill it might not have // succeeded. Just in case, try again - the kill message // will only be sent once. // // To reproduce this deadlock: run ffi002(threaded1) // repeatedly on a loaded machine. ioManagerDie(); yieldThread(); continue; } traceSparkCounters(cap); RELEASE_LOCK(&cap->lock); break; } // we now have the Capability, its run queue and spare workers // list are both empty. // ToDo: we can't drop this mutex, because there might still be // threads performing foreign calls that will eventually try to // return via resumeThread() and attempt to grab cap->lock. // closeMutex(&cap->lock); #endif } void shutdownCapabilities(Task *task, bool safe) { uint32_t i; for (i=0; i < n_capabilities; i++) { ASSERT(task->incall->tso == NULL); shutdownCapability(capabilities[i], task, safe); } #if defined(THREADED_RTS) ASSERT(checkSparkCountInvariant()); #endif } static void freeCapability (Capability *cap) { stgFree(cap->mut_lists); stgFree(cap->saved_mut_lists); #if defined(THREADED_RTS) freeSparkPool(cap->sparks); #endif traceCapsetRemoveCap(CAPSET_OSPROCESS_DEFAULT, cap->no); traceCapsetRemoveCap(CAPSET_CLOCKDOMAIN_DEFAULT, cap->no); traceCapDelete(cap); } void freeCapabilities (void) { #if defined(THREADED_RTS) uint32_t i; for (i=0; i < n_capabilities; i++) { freeCapability(capabilities[i]); if (capabilities[i] != &MainCapability) stgFree(capabilities[i]); } #else freeCapability(&MainCapability); #endif stgFree(capabilities); traceCapsetDelete(CAPSET_OSPROCESS_DEFAULT); traceCapsetDelete(CAPSET_CLOCKDOMAIN_DEFAULT); } /* --------------------------------------------------------------------------- Mark everything directly reachable from the Capabilities. When using multiple GC threads, each GC thread marks all Capabilities for which (c `mod` n == 0), for Capability c and thread n. ------------------------------------------------------------------------ */ void markCapability (evac_fn evac, void *user, Capability *cap, bool no_mark_sparks USED_IF_THREADS) { InCall *incall; // Each GC thread is responsible for following roots from the // Capability of the same number. There will usually be the same // or fewer Capabilities as GC threads, but just in case there // are more, we mark every Capability whose number is the GC // thread's index plus a multiple of the number of GC threads. evac(user, (StgClosure **)(void *)&cap->run_queue_hd); evac(user, (StgClosure **)(void *)&cap->run_queue_tl); #if defined(THREADED_RTS) evac(user, (StgClosure **)(void *)&cap->inbox); #endif for (incall = cap->suspended_ccalls; incall != NULL; incall=incall->next) { evac(user, (StgClosure **)(void *)&incall->suspended_tso); } #if defined(THREADED_RTS) if (!no_mark_sparks) { traverseSparkQueue (evac, user, cap); } #endif // Free STM structures for this Capability stmPreGCHook(cap); } void markCapabilities (evac_fn evac, void *user) { uint32_t n; for (n = 0; n < n_capabilities; n++) { markCapability(evac, user, capabilities[n], false); } } #if defined(THREADED_RTS) bool checkSparkCountInvariant (void) { SparkCounters sparks = { 0, 0, 0, 0, 0, 0 }; StgWord64 remaining = 0; uint32_t i; for (i = 0; i < n_capabilities; i++) { sparks.created += capabilities[i]->spark_stats.created; sparks.dud += capabilities[i]->spark_stats.dud; sparks.overflowed+= capabilities[i]->spark_stats.overflowed; sparks.converted += capabilities[i]->spark_stats.converted; sparks.gcd += capabilities[i]->spark_stats.gcd; sparks.fizzled += capabilities[i]->spark_stats.fizzled; remaining += sparkPoolSize(capabilities[i]->sparks); } /* The invariant is * created = converted + remaining + gcd + fizzled */ debugTrace(DEBUG_sparks,"spark invariant: %ld == %ld + %ld + %ld + %ld " "(created == converted + remaining + gcd + fizzled)", sparks.created, sparks.converted, remaining, sparks.gcd, sparks.fizzled); return (sparks.created == sparks.converted + remaining + sparks.gcd + sparks.fizzled); } #endif #if !defined(mingw32_HOST_OS) void setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) { #if defined(THREADED_RTS) if (cap_no < n_capabilities) { capabilities[cap_no]->io_manager_control_wr_fd = fd; } else { errorBelch("warning: setIOManagerControlFd called with illegal capability number."); } #endif } #endif