diff options
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r-- | rts/Schedule.c | 89 |
1 files changed, 41 insertions, 48 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index 8002ac37dc..0444f0ca15 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -41,7 +41,8 @@ #include "Timer.h" #include "ThreadPaused.h" #include "Messages.h" -#include "Stable.h" +#include "StablePtr.h" +#include "StableName.h" #include "TopHandler.h" #if defined(HAVE_SYS_TYPES_H) @@ -67,7 +68,7 @@ * -------------------------------------------------------------------------- */ #if !defined(THREADED_RTS) -// Blocked/sleeping thrads +// Blocked/sleeping threads StgTSO *blocked_queue_hd = NULL; StgTSO *blocked_queue_tl = NULL; StgTSO *sleeping_queue = NULL; // perhaps replace with a hash table? @@ -151,11 +152,11 @@ static bool scheduleHandleThreadFinished( Capability *cap, Task *task, static bool scheduleNeedHeapProfile(bool ready_to_gc); static void scheduleDoGC(Capability **pcap, Task *task, bool force_major); -static void deleteThread (Capability *cap, StgTSO *tso); -static void deleteAllThreads (Capability *cap); +static void deleteThread (StgTSO *tso); +static void deleteAllThreads (void); #if defined(FORKPROCESS_PRIMOP_SUPPORTED) -static void deleteThread_(Capability *cap, StgTSO *tso); +static void deleteThread_(StgTSO *tso); #endif /* --------------------------------------------------------------------------- @@ -180,9 +181,6 @@ schedule (Capability *initialCapability, Task *task) StgThreadReturnCode ret; uint32_t prev_what_next; bool ready_to_gc; -#if defined(THREADED_RTS) - bool first = true; -#endif cap = initialCapability; @@ -271,7 +269,7 @@ schedule (Capability *initialCapability, Task *task) } break; default: - barf("sched_state: %d", sched_state); + barf("sched_state: %" FMT_Word, sched_state); } scheduleFindWork(&cap); @@ -292,16 +290,6 @@ schedule (Capability *initialCapability, Task *task) // as a result of a console event having been delivered. #if defined(THREADED_RTS) - if (first) - { - // XXX: ToDo - // // don't yield the first time, we want a chance to run this - // // thread for a bit, even if there are others banging at the - // // door. - // first = false; - // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - } - scheduleYield(&cap,task); if (emptyRunQueue(cap)) continue; // look for work again @@ -360,7 +348,7 @@ schedule (Capability *initialCapability, Task *task) // in a foreign call returns. if (sched_state >= SCHED_INTERRUPTING && !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) { - deleteThread(cap,t); + deleteThread(t); } // If this capability is disabled, migrate the thread away rather @@ -679,7 +667,11 @@ scheduleYield (Capability **pcap, Task *task) // otherwise yield (sleep), and keep yielding if necessary. do { - didGcLast = yieldCapability(&cap,task, !didGcLast); + if (doIdleGCWork(cap, false)) { + didGcLast = false; + } else { + didGcLast = yieldCapability(&cap,task, !didGcLast); + } } while (shouldYieldCapability(cap,task,didGcLast)); @@ -701,8 +693,6 @@ static void schedulePushWork(Capability *cap USED_IF_THREADS, Task *task USED_IF_THREADS) { - /* following code not for PARALLEL_HASKELL. I kept the call general, - future GUM versions might use pushing in a distributed setup */ #if defined(THREADED_RTS) Capability *free_caps[n_capabilities], *cap0; @@ -1263,7 +1253,7 @@ scheduleHandleThreadBlocked( StgTSO *t * -------------------------------------------------------------------------- */ static bool -scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) +scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t) { /* Need to check whether this was a main thread, and if so, * return with the return value. @@ -1352,7 +1342,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) * -------------------------------------------------------------------------- */ static bool -scheduleNeedHeapProfile( bool ready_to_gc STG_UNUSED ) +scheduleNeedHeapProfile( bool ready_to_gc ) { // When we have +RTS -i0 and we're heap profiling, do a census at // every GC. This lets us get repeatable runs for debugging. @@ -1738,10 +1728,8 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, // they have stopped mutating and are standing by for GC. waitForGcThreads(cap, idle_cap); -#if defined(THREADED_RTS) // Stable point where we can do a global check on our spark counters ASSERT(checkSparkCountInvariant()); -#endif } #endif @@ -1756,7 +1744,7 @@ delete_threads_and_gc: * Checking for major_gc ensures that the last GC is major. */ if (sched_state == SCHED_INTERRUPTING && major_gc) { - deleteAllThreads(cap); + deleteAllThreads(); #if defined(THREADED_RTS) // Discard all the sparks from every Capability. Why? // They'll probably be GC'd anyway since we've killed all the @@ -1800,6 +1788,9 @@ delete_threads_and_gc: } #endif + // Do any remaining idle GC work from the previous GC + doIdleGCWork(cap, true /* all of it */); + #if defined(THREADED_RTS) // reset pending_sync *before* GC, so that when the GC threads // emerge they don't immediately re-enter the GC. @@ -1809,6 +1800,11 @@ delete_threads_and_gc: GarbageCollect(collect_gen, heap_census, 0, cap, NULL); #endif + // If we're shutting down, don't leave any idle GC work to do. + if (sched_state == SCHED_SHUTTING_DOWN) { + doIdleGCWork(cap, true /* all of it */); + } + traceSparkCounters(cap); switch (recent_activity) { @@ -1920,13 +1916,6 @@ delete_threads_and_gc: throwToSelf(cap, main_thread, heapOverflow_closure); } } -#if defined(SPARKBALANCE) - /* JB - Once we are all together... this would be the place to balance all - spark pools. No concurrent stealing or adding of new sparks can - occur. Should be defined in Sparks.c. */ - balanceSparkPoolsCaps(n_capabilities, capabilities); -#endif #if defined(THREADED_RTS) stgFree(idle_cap); @@ -1976,7 +1965,8 @@ forkProcess(HsStablePtr *entry // inconsistent state in the child. See also #1391. ACQUIRE_LOCK(&sched_mutex); ACQUIRE_LOCK(&sm_mutex); - ACQUIRE_LOCK(&stable_mutex); + ACQUIRE_LOCK(&stable_ptr_mutex); + ACQUIRE_LOCK(&stable_name_mutex); ACQUIRE_LOCK(&task->lock); for (i=0; i < n_capabilities; i++) { @@ -2001,18 +1991,20 @@ forkProcess(HsStablePtr *entry RELEASE_LOCK(&sched_mutex); RELEASE_LOCK(&sm_mutex); - RELEASE_LOCK(&stable_mutex); + RELEASE_LOCK(&stable_ptr_mutex); + RELEASE_LOCK(&stable_name_mutex); RELEASE_LOCK(&task->lock); +#if defined(THREADED_RTS) + /* N.B. releaseCapability_ below may need to take all_tasks_mutex */ + RELEASE_LOCK(&all_tasks_mutex); +#endif + for (i=0; i < n_capabilities; i++) { releaseCapability_(capabilities[i],false); RELEASE_LOCK(&capabilities[i]->lock); } -#if defined(THREADED_RTS) - RELEASE_LOCK(&all_tasks_mutex); -#endif - boundTaskExiting(task); // just return the pid @@ -2023,7 +2015,8 @@ forkProcess(HsStablePtr *entry #if defined(THREADED_RTS) initMutex(&sched_mutex); initMutex(&sm_mutex); - initMutex(&stable_mutex); + initMutex(&stable_ptr_mutex); + initMutex(&stable_name_mutex); initMutex(&task->lock); for (i=0; i < n_capabilities; i++) { @@ -2049,7 +2042,7 @@ forkProcess(HsStablePtr *entry // don't allow threads to catch the ThreadKilled // exception, but we do want to raiseAsync() because these // threads may be evaluating thunks that we need later. - deleteThread_(t->cap,t); + deleteThread_(t); // stop the GC from updating the InCall to point to // the TSO. This is only necessary because the @@ -2273,7 +2266,7 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS) * ------------------------------------------------------------------------- */ static void -deleteAllThreads ( Capability *cap ) +deleteAllThreads () { // NOTE: only safe to call if we own all capabilities. @@ -2284,7 +2277,7 @@ deleteAllThreads ( Capability *cap ) for (g = 0; g < RtsFlags.GcFlags.generations; g++) { for (t = generations[g].threads; t != END_TSO_QUEUE; t = next) { next = t->global_link; - deleteThread(cap,t); + deleteThread(t); } } @@ -2795,7 +2788,7 @@ void wakeUpRts(void) -------------------------------------------------------------------------- */ static void -deleteThread (Capability *cap STG_UNUSED, StgTSO *tso) +deleteThread (StgTSO *tso) { // NOTE: must only be called on a TSO that we have exclusive // access to, because we will call throwToSingleThreaded() below. @@ -2810,7 +2803,7 @@ deleteThread (Capability *cap STG_UNUSED, StgTSO *tso) #if defined(FORKPROCESS_PRIMOP_SUPPORTED) static void -deleteThread_(Capability *cap, StgTSO *tso) +deleteThread_(StgTSO *tso) { // for forkProcess only: // like deleteThread(), but we delete threads in foreign calls, too. @@ -2819,7 +2812,7 @@ deleteThread_(Capability *cap, StgTSO *tso) tso->what_next = ThreadKilled; appendToRunQueue(tso->cap, tso); } else { - deleteThread(cap,tso); + deleteThread(tso); } } #endif |