diff options
author | Douglas Wilson <douglas.wilson@gmail.com> | 2020-12-15 09:56:02 +0000 |
---|---|---|
committer | Marge Bot <ben+marge-bot@smart-cactus.org> | 2021-01-17 05:49:54 -0500 |
commit | f395c2cb2e6bb0b9f9f3d6deb923c72fe7433d37 (patch) | |
tree | 574bcd8e8c1637cc858e24f1fc288853505d13bb /rts | |
parent | 971a88a78105b207fb1e2e2614877fd080d2ccd1 (diff) | |
download | haskell-f395c2cb2e6bb0b9f9f3d6deb923c72fe7433d37.tar.gz |
rts: gc: use mutex+condvar instead of sched_yield in gc main loop
Here we remove the schedYield loop in scavenge_until_all_done+any_work, replacing
it with a single mutex + condition variable.
Previously any_work would check todo_large_objects, todo_q,
todo_overflow of each gen for work. Comments explained that this was
checking global work in any gen. However, these must have been out of
date, because all of these locations are local to a gc thread.
We've eliminated any_work entirely, instead simply looping back into
scavenge_loop, which will quickly return if there is no work.
shutdown_gc_threads is called slightly earlier than before. This ensures
that n_gc_threads can never be observed to increase from 0 by a worker thread.
startup_gc_threads is removed. It consisted of a single variable
assignment, which is moved inline to it's single callsite.
Diffstat (limited to 'rts')
-rw-r--r-- | rts/sm/GC.c | 333 | ||||
-rw-r--r-- | rts/sm/GC.h | 1 | ||||
-rw-r--r-- | rts/sm/GCUtils.c | 37 |
3 files changed, 237 insertions, 134 deletions
diff --git a/rts/sm/GC.c b/rts/sm/GC.c index 4c46e82200..64a4c071b6 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -130,11 +130,30 @@ MutListScavStats mutlist_scav_stats; */ gc_thread **gc_threads = NULL; -#if !defined(THREADED_RTS) + +// see Note [Synchronising work stealing] +static StgWord gc_running_threads; + +#if defined(THREADED_RTS) + +static Mutex gc_running_mutex; +static Condition gc_running_cv; + +static Mutex gc_entry_mutex; +static StgInt n_gc_entered = 0; +static Condition gc_entry_arrived_cv; +static Condition gc_entry_start_now_cv; + +static Mutex gc_exit_mutex; +static StgInt n_gc_exited = 0; +static Condition gc_exit_arrived_cv; +static Condition gc_exit_leave_now_cv; + +#else // THREADED_RTS // Must be aligned to 64-bytes to meet stated 64-byte alignment of gen_workspace StgWord8 the_gc_thread[sizeof(gc_thread) + 64 * sizeof(gen_workspace)] ATTRIBUTE_ALIGNED(64); -#endif +#endif // THREADED_RTS /* Note [n_gc_threads] This is a global variable that originally tracked the number of threads @@ -153,6 +172,8 @@ for(i=0;i<n_gc_threads;++i) { foo(gc_thread[i]; )} Omitting this check has led to issues such as #19147. */ uint32_t n_gc_threads; +static uint32_t n_gc_idle_threads; +bool work_stealing; // For stats: static long copied; // *words* copied & scavenged during this GC @@ -162,9 +183,7 @@ static long copied; // *words* copied & scavenged during this GC volatile StgWord64 waitForGcThreads_spin = 0; volatile StgWord64 waitForGcThreads_yield = 0; volatile StgWord64 whitehole_gc_spin = 0; -#endif - -bool work_stealing; +#endif // PROF_SPIN uint32_t static_flag = STATIC_FLAG_B; uint32_t prev_static_flag = STATIC_FLAG_A; @@ -180,7 +199,6 @@ static void prepare_collected_gen (generation *gen); static void prepare_uncollected_gen (generation *gen); static void init_gc_thread (gc_thread *t); static void resize_nursery (void); -static void start_gc_threads (void); static void scavenge_until_all_done (void); static StgWord inc_running (void); static StgWord dec_running (void); @@ -345,8 +363,35 @@ GarbageCollect (uint32_t collect_gen, } #if defined(THREADED_RTS) + /* How many threads will be participating in this GC? + * We don't always parallelise minor GCs, or mark/compact/sweep GC. + * The policy on when to do a parallel GC is controlled by RTS flags (see + * below) + + * There are subtleties here. In the PAR case, we copy n_gc_threads from + * n_capabilities, presumably so that n_capabailites doesn' change under us. I + * don't understand quite how that happens, but the test setnumcapabilities001 + * demonstrates it. + * + * we set n_gc_threads, work_stealing, n_gc_idle_threads, gc_running_threads + * here + */ + if (gc_type == SYNC_GC_PAR) { + n_gc_threads = n_capabilities; + n_gc_idle_threads = 0; + for (uint32_t i = 0; i < n_capabilities; ++i) { + if (idle_cap[i]) { + ASSERT(i != gct->thread_index); + ++n_gc_idle_threads; + } + } + } else { + n_gc_threads = 1; + n_gc_idle_threads = n_capabilities - 1; + } work_stealing = RtsFlags.ParFlags.parGcLoadBalancingEnabled && - N >= RtsFlags.ParFlags.parGcLoadBalancingGen; + N >= RtsFlags.ParFlags.parGcLoadBalancingGen && + n_gc_threads > 1; // It's not always a good idea to do load balancing in parallel // GC. In particular, for a parallel program we don't want to // lose locality by moving cached data into another CPU's cache @@ -356,28 +401,25 @@ GarbageCollect (uint32_t collect_gen, // work stealing or not, e.g. it might be a good idea to do it // if the heap is big. For now, we just turn it on or off with // a flag. -#endif - - /* Start threads, so they can be spinning up while we finish initialisation. - */ - start_gc_threads(); - -#if defined(THREADED_RTS) - /* How many threads will be participating in this GC? - * We don't try to parallelise minor GCs (unless the user asks for - * it with +RTS -gn0), or mark/compact/sweep GC. - */ - if (gc_type == SYNC_GC_PAR) { - n_gc_threads = n_capabilities; - } else { - n_gc_threads = 1; - } #else n_gc_threads = 1; + n_gc_idle_threads = 0; + work_stealing = false; #endif - debugTrace(DEBUG_gc, "GC (gen %d, using %d thread(s))", - N, n_gc_threads); + SEQ_CST_STORE(&gc_running_threads, 0); + + ASSERT(n_gc_threads > 0); + ASSERT(n_gc_threads <= n_capabilities); + ASSERT(n_gc_idle_threads < n_capabilities); + // If we are work stealing, there better be another(i.e. not us) non-idle gc + // thread + ASSERT(!work_stealing || n_gc_threads - 1 > n_gc_idle_threads); + + + debugTrace(DEBUG_gc, "GC (gen %d, using %d thread(s), %s work stealing)", + N, (int)n_capabilities - (int)n_gc_idle_threads, + work_stealing ? "with": "without"); #if defined(DEBUG) // check for memory leaks if DEBUG is on @@ -486,30 +528,22 @@ GarbageCollect (uint32_t collect_gen, /* ------------------------------------------------------------------------- * Repeatedly scavenge all the areas we know about until there's no * more scavenging to be done. + * see Note [Synchronising work stealing] */ + scavenge_until_all_done(); + shutdown_gc_threads(gct->thread_index, idle_cap); StgWeak *dead_weak_ptr_list = NULL; StgTSO *resurrected_threads = END_TSO_QUEUE; - - for (;;) + // must be last... invariant is that everything is fully + // scavenged at this point. + work_stealing = false; + while (traverseWeakPtrList(&dead_weak_ptr_list, &resurrected_threads)) { + inc_running(); scavenge_until_all_done(); - - // The other threads are now stopped. We might recurse back to - // here, but from now on this is the only thread. - - // must be last... invariant is that everything is fully - // scavenged at this point. - if (traverseWeakPtrList(&dead_weak_ptr_list, &resurrected_threads)) { // returns true if evaced something - inc_running(); - continue; - } - - // If we get to here, there's really nothing left to do. - break; } - shutdown_gc_threads(gct->thread_index, idle_cap); // Now see which stable names are still alive. gcStableNameTable(); @@ -1117,6 +1151,8 @@ initGcThreads (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS) } else { gc_threads = stgMallocBytes (to * sizeof(gc_thread*), "initGcThreads"); + initMutex(&gc_running_mutex); + initCondition(&gc_running_cv); } for (i = from; i < to; i++) { @@ -1149,7 +1185,10 @@ freeGcThreads (void) } stgFree (gc_threads[i]); } + closeCondition(&gc_running_cv); + closeMutex(&gc_running_mutex); stgFree (gc_threads); + #else for (g = 0; g < RtsFlags.GcFlags.generations; g++) { @@ -1165,11 +1204,11 @@ freeGcThreads (void) Start GC threads ------------------------------------------------------------------------- */ -static volatile StgWord gc_running_threads; - static StgWord inc_running (void) { + // We don't hold gc_running_mutex. + // See Note [Synchronising work stealing] StgWord new; new = atomic_inc(&gc_running_threads, 1); ASSERT(new <= n_gc_threads); @@ -1179,101 +1218,112 @@ inc_running (void) static StgWord dec_running (void) { + // ref Note [Synchronising work stealing] ASSERT(RELAXED_LOAD(&gc_running_threads) != 0); - return atomic_dec(&gc_running_threads); -} - -static bool -any_work (void) -{ - int g; - gen_workspace *ws; - - NONATOMIC_ADD(&gct->any_work, 1); +#if defined(THREADED_RTS) + ACQUIRE_LOCK(&gc_running_mutex); +#endif - write_barrier(); + StgWord r = atomic_dec(&gc_running_threads); - // scavenge objects in compacted generation - if (mark_stack_bd != NULL && !mark_stack_empty()) { - return true; +#if defined(THREADED_RTS) + if (r == 0) { + broadcastCondition(&gc_running_cv); } + RELEASE_LOCK(&gc_running_mutex); +#endif - // Check for global work in any gen. We don't need to check for - // local work, because we have already exited scavenge_loop(), - // which means there is no local work for this thread. - for (g = 0; g < (int)RtsFlags.GcFlags.generations; g++) { - ws = &gct->gens[g]; - if (ws->todo_large_objects) return true; - if (!looksEmptyWSDeque(ws->todo_q)) return true; - if (ws->todo_overflow) return true; - } + return r; +} -#if defined(THREADED_RTS) - if (work_stealing) { - uint32_t n; - // look for work to steal - for (n = 0; n < n_gc_threads; n++) { - if (n == gct->thread_index) continue; - for (g = RtsFlags.GcFlags.generations-1; g >= 0; g--) { - ws = &gc_threads[n]->gens[g]; - if (!looksEmptyWSDeque(ws->todo_q)) return true; - } +# if defined(THREADED_RTS) +void notifyTodoBlock(void) { + // See Note [Synchronising work stealing] + // We check work_stealing here because we can actually be called when + // gc_running_threads == 0, which triggers the asserts. This happens inside + // traverseWeakPtrList. + // So the gc leader conveniently sets work_stealing to false before it + // begins the final sequential collections. + if(work_stealing) { + // This is racy. However the consequences are slight. + // If we see too many threads running we won't signal the condition + // variable. That's ok, we'll signal when the next block is pushed. + // There are loads of blocks. + // If we see too few threads running we will signal the condition + // variable, but there will be no waiters. This is very cheap. + StgInt running_threads = SEQ_CST_LOAD(&gc_running_threads); + StgInt max_running_threads = (StgInt)n_gc_threads - (StgInt)n_gc_idle_threads; + // These won't hold if !work_stealing, because it may be that: + // n_gc_threads < n_gc_idle_threads + ASSERT(running_threads > 0); + ASSERT(max_running_threads > 0); + ASSERT(running_threads <= max_running_threads); + if(running_threads < max_running_threads) { + signalCondition(&gc_running_cv); } } +} #endif - gct->no_work++; -#if defined(THREADED_RTS) - yieldThread(); -#endif - - return false; -} static void scavenge_until_all_done (void) { - DEBUG_ONLY( uint32_t r ); - + uint32_t r USED_IF_DEBUG USED_IF_THREADS; -loop: + for(;;) { #if defined(THREADED_RTS) - if (n_gc_threads > 1) { - scavenge_loop(); - } else { - scavenge_loop1(); - } + if (n_gc_threads > 1) { + scavenge_loop(); + } else { + scavenge_loop1(); + } #else - scavenge_loop(); + scavenge_loop(); #endif - collect_gct_blocks(); + collect_gct_blocks(); - // scavenge_loop() only exits when there's no work to do + // scavenge_loop() only exits when there's no work to do - // This atomic decrement also serves as a full barrier to ensure that any - // writes we made during scavenging are visible to other threads. -#if defined(DEBUG) - r = dec_running(); -#else - dec_running(); -#endif + // This atomic decrement also serves as a full barrier to ensure that any + // writes we made during scavenging are visible to other threads. + r = dec_running(); - traceEventGcIdle(gct->cap); + traceEventGcIdle(gct->cap); - debugTrace(DEBUG_gc, "%d GC threads still running", r); + debugTrace(DEBUG_gc, "%d GC threads still running", r); - while (SEQ_CST_LOAD(&gc_running_threads) != 0) { - // usleep(1); - if (any_work()) { - inc_running(); - traceEventGcWork(gct->cap); - goto loop; + // If there's no hope of stealing more work, then there's nowhere else + // work can come from and we are finished +#if defined(THREADED_RTS) + if(n_gc_threads > 1 && work_stealing && r != 0) { + NONATOMIC_ADD(&gct->any_work, 1); + ACQUIRE_LOCK(&gc_running_mutex); + // this is SEQ_CST because I haven't considered if it could be + // weaker + r = SEQ_CST_LOAD(&gc_running_threads); + if (r != 0) { + waitCondition(&gc_running_cv, &gc_running_mutex); + // this is SEQ_CST because I haven't considered if it could be + // weaker + r = SEQ_CST_LOAD(&gc_running_threads); + } + // here, if r is 0 then all threads are finished + // if r > 0 then either: + // - waitCondition was subject to spurious wakeup + // - a worker thread just pushed a block to it's todo_q + // so we loop back, looking for more work. + RELEASE_LOCK(&gc_running_mutex); + if (r != 0) { + inc_running(); + traceEventGcWork(gct->cap); + continue; // for(;;) loop + } + NONATOMIC_ADD(&gct->no_work, 1); } - // any_work() does not remove the work from the queue, it - // just checks for the presence of work. If we find any, - // then we increment gc_running_threads and go back to - // scavenge_loop() to perform any pending work. +#endif + break; // for(;;) loop } traceEventGcDone(gct->cap); @@ -1405,14 +1455,6 @@ waitForGcThreads (Capability *cap USED_IF_THREADS, bool idle_cap[]) #endif // THREADED_RTS static void -start_gc_threads (void) -{ -#if defined(THREADED_RTS) - gc_running_threads = 0; -#endif -} - -static void wakeup_gc_threads (uint32_t me USED_IF_THREADS, bool idle_cap[] USED_IF_THREADS) { @@ -1437,7 +1479,7 @@ wakeup_gc_threads (uint32_t me USED_IF_THREADS, // After GC is complete, we must wait for all GC threads to enter the // standby state, otherwise they may still be executing inside -// any_work(), and may even remain awake until the next GC starts. +// scavenge_until_all_done(), and may even remain awake until the next GC starts. static void shutdown_gc_threads (uint32_t me USED_IF_THREADS, bool idle_cap[] USED_IF_THREADS) @@ -2088,3 +2130,50 @@ bool doIdleGCWork(Capability *cap STG_UNUSED, bool all) { return runSomeFinalizers(all); } + + +/* Note [Synchronising work stealing] + * + * During parallel garbage collections, idle gc threads will steal work from + * other threads. If they see no work to steal then they will wait on a + * condition variabl(gc_running_cv). + * + * There are two synchronisation primitivees: + * - gc_running_mutex + * - gc_running_cv + * + * Two mutable variables + * - gc_running_threads + * - work_stealing + * + * Two immutable variables + * - n_gc_threads + * - n_gc_idle_threads + * + * gc_running_threads is modified only through the functions inc_running and + * dec_running are called when a gc thread starts(wakeup_gc_threads), runs out + * of work(scavenge_until_all_done), or finds more + * work(scavenge_until_all_done). + * + * We care about the value of gc_running_threads in two places. + * (a) in dec_running, if gc_running_threads reaches 0 then we broadcast + * gc_running_cv so that all gc_threads can exis scavenge_until_all_done. + * (b) in notifyTodoBlock, if there are any threads not running, then we + * signal gc_running_cv so a thread can try stealing some work. + * + * Note that: + * (c) inc_running does not hold gc_running_mutex while incrementing + * gc_running_threads. + * (d) notifyTodoBlock does not hold gc_running_mutex while inspecting + * gc_running_mutex. + * (d) The gc leader calls shutdown_gc_threads before it begins the final + * sequential collections (i.e. traverseWeakPtrList) + * (e) A gc worker thread can never observe gc_running_threads increasing + * from 0. gc_running_threads will increase from 0, but this is after (d), + * where gc worker threads are all finished. + * (f) The check in (b) tolerates wrong values of gc_running_threads. See the + * function for details. + * + * work_stealing is "mostly immutable". We set it to false when we begin the + * final sequential collections, for the benefit of notifyTodoBlock. + * */ diff --git a/rts/sm/GC.h b/rts/sm/GC.h index 2c2d14a7d2..239f281910 100644 --- a/rts/sm/GC.h +++ b/rts/sm/GC.h @@ -76,6 +76,7 @@ void freeGcThreads (void); void resizeGenerations (void); #if defined(THREADED_RTS) +void notifyTodoBlock (void); void waitForGcThreads (Capability *cap, bool idle_cap[]); void releaseGCThreads (Capability *cap, bool idle_cap[]); #endif diff --git a/rts/sm/GCUtils.c b/rts/sm/GCUtils.c index d58fdc48ae..ea7d83ab8f 100644 --- a/rts/sm/GCUtils.c +++ b/rts/sm/GCUtils.c @@ -30,6 +30,8 @@ SpinLock gc_alloc_block_sync; #endif +static void push_todo_block(bdescr *bd, gen_workspace *ws); + bdescr* allocGroup_sync(uint32_t n) { bdescr *bd; @@ -127,7 +129,8 @@ steal_todo_block (uint32_t g) // look for work to steal for (n = 0; n < n_gc_threads; n++) { if (n == gct->thread_index) continue; - bd = stealWSDeque(gc_threads[n]->gens[g].todo_q); + q = gc_threads[n]->gens[g].todo_q; + bd = stealWSDeque(q); if (bd) { return bd; } @@ -168,6 +171,26 @@ push_scanned_block (bdescr *bd, gen_workspace *ws) } } +void +push_todo_block(bdescr *bd, gen_workspace *ws) +{ + debugTrace(DEBUG_gc, "push todo block %p (%ld words), step %d, todo_q: %ld", + bd->start, (unsigned long)(bd->free - bd->u.scan), + ws->gen->no, dequeElements(ws->todo_q)); + + ASSERT(bd->link == NULL); + + if(!pushWSDeque(ws->todo_q, bd)) { + bd->link = ws->todo_overflow; + ws->todo_overflow = bd; + ws->n_todo_overflow++; + } + +#if defined(THREADED_RTS) + notifyTodoBlock(); +#endif +} + /* Note [big objects] We can get an ordinary object (CONSTR, FUN, THUNK etc.) that is @@ -277,17 +300,7 @@ todo_block_full (uint32_t size, gen_workspace *ws) // Otherwise, push this block out to the global list. else { - DEBUG_ONLY( generation *gen ); - DEBUG_ONLY( gen = ws->gen ); - debugTrace(DEBUG_gc, "push todo block %p (%ld words), step %d, todo_q: %ld", - bd->start, (unsigned long)(bd->free - bd->u.scan), - gen->no, dequeElements(ws->todo_q)); - - if (!pushWSDeque(ws->todo_q, bd)) { - bd->link = ws->todo_overflow; - ws->todo_overflow = bd; - ws->n_todo_overflow++; - } + push_todo_block(bd, ws); } } |