diff options
-rw-r--r-- | includes/RtsFlags.h | 4 | ||||
-rw-r--r-- | includes/Storage.h | 2 | ||||
-rw-r--r-- | rts/Capability.c | 74 | ||||
-rw-r--r-- | rts/Capability.h | 6 | ||||
-rw-r--r-- | rts/RtsFlags.c | 42 | ||||
-rw-r--r-- | rts/Schedule.c | 117 | ||||
-rw-r--r-- | rts/Stats.c | 4 | ||||
-rw-r--r-- | rts/sm/GC.c | 237 | ||||
-rw-r--r-- | rts/sm/GC.h | 4 | ||||
-rw-r--r-- | rts/sm/GCThread.h | 7 | ||||
-rw-r--r-- | rts/sm/Storage.c | 4 |
11 files changed, 278 insertions, 223 deletions
diff --git a/includes/RtsFlags.h b/includes/RtsFlags.h index 55b00bb2a0..e14c9405ff 100644 --- a/includes/RtsFlags.h +++ b/includes/RtsFlags.h @@ -179,7 +179,9 @@ struct PAR_FLAGS { rtsBool migrate; /* migrate threads between capabilities */ rtsBool wakeupMigrate; /* migrate a thread on wakeup */ unsigned int maxLocalSparks; - nat gcThreads; /* number of threads for parallel GC */ + rtsBool parGcEnabled; /* enable parallel GC */ + rtsBool parGcGen; /* do parallel GC in this generation + * and higher only */ }; #endif /* THREADED_RTS */ diff --git a/includes/Storage.h b/includes/Storage.h index d431298af9..0a7aae6750 100644 --- a/includes/Storage.h +++ b/includes/Storage.h @@ -220,7 +220,7 @@ extern bdescr * splitLargeBlock (bdescr *bd, nat blocks); -------------------------------------------------------------------------- */ -extern void GarbageCollect(rtsBool force_major_gc); +extern void GarbageCollect(rtsBool force_major_gc, nat gc_type, Capability *cap); /* ----------------------------------------------------------------------------- Generational garbage collection support diff --git a/rts/Capability.c b/rts/Capability.c index 8dddbc5d34..7c6ceb5c66 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -26,6 +26,7 @@ #include "Schedule.h" #include "Sparks.h" #include "Trace.h" +#include "GC.h" // one global capability, this is the Capability for non-threaded // builds, and for +RTS -N1 @@ -190,6 +191,7 @@ initCapability( Capability *cap, nat i ) cap->no = i; cap->in_haskell = rtsFalse; + cap->in_gc = rtsFalse; cap->run_queue_hd = END_TSO_QUEUE; cap->run_queue_tl = END_TSO_QUEUE; @@ -358,14 +360,7 @@ releaseCapability_ (Capability* cap, return; } - /* if waiting_for_gc was the reason to release the cap: thread - comes from yieldCap->releaseAndQueueWorker. Unconditionally set - cap. free and return (see default after the if-protected other - special cases). Thread will wait on cond.var and re-acquire the - same cap after GC (GC-triggering cap. calls releaseCap and - enters the spare_workers case) - */ - if (waiting_for_gc) { + if (waiting_for_gc == PENDING_GC_SEQ) { last_free_capability = cap; // needed? trace(TRACE_sched | DEBUG_sched, "GC pending, set capability %d free", cap->no); @@ -557,6 +552,12 @@ yieldCapability (Capability** pCap, Task *task) { Capability *cap = *pCap; + if (waiting_for_gc == PENDING_GC_PAR) { + debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no); + gcWorkerThread(cap); + return; + } + debugTrace(DEBUG_sched, "giving up capability %d", cap->no); // We must now release the capability and wait to be woken up @@ -655,58 +656,21 @@ wakeupThreadOnCapability (Capability *my_cap, } /* ---------------------------------------------------------------------------- - * prodCapabilities + * prodCapability * - * Used to indicate that the interrupted flag is now set, or some - * other global condition that might require waking up a Task on each - * Capability. - * ------------------------------------------------------------------------- */ - -static void -prodCapabilities(rtsBool all) -{ - nat i; - Capability *cap; - Task *task; - - for (i=0; i < n_capabilities; i++) { - cap = &capabilities[i]; - ACQUIRE_LOCK(&cap->lock); - if (!cap->running_task) { - if (cap->spare_workers) { - trace(TRACE_sched, "resuming capability %d", cap->no); - task = cap->spare_workers; - ASSERT(!task->stopped); - giveCapabilityToTask(cap,task); - if (!all) { - RELEASE_LOCK(&cap->lock); - return; - } - } - } - RELEASE_LOCK(&cap->lock); - } - return; -} - -void -prodAllCapabilities (void) -{ - prodCapabilities(rtsTrue); -} - -/* ---------------------------------------------------------------------------- - * prodOneCapability - * - * Like prodAllCapabilities, but we only require a single Task to wake - * up in order to service some global event, such as checking for - * deadlock after some idle time has passed. + * If a Capability is currently idle, wake up a Task on it. Used to + * get every Capability into the GC. * ------------------------------------------------------------------------- */ void -prodOneCapability (void) +prodCapability (Capability *cap, Task *task) { - prodCapabilities(rtsFalse); + ACQUIRE_LOCK(&cap->lock); + if (!cap->running_task) { + cap->running_task = task; + releaseCapability_(cap,rtsTrue); + } + RELEASE_LOCK(&cap->lock); } /* ---------------------------------------------------------------------------- diff --git a/rts/Capability.h b/rts/Capability.h index 89545780a4..478b0f1312 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -50,6 +50,9 @@ struct Capability_ { // catching unsafe call-ins. rtsBool in_haskell; + // true if this Capability is currently in the GC + rtsBool in_gc; + // The run queue. The Task owning this Capability has exclusive // access to its run queue, so can wake up threads without // taking a lock, and the common path through the scheduler is @@ -191,6 +194,8 @@ extern Capability *capabilities; extern Capability *last_free_capability; // GC indicator, in scope for the scheduler +#define PENDING_GC_SEQ 1 +#define PENDING_GC_PAR 2 extern volatile StgWord waiting_for_gc; // Acquires a capability at a return point. If *cap is non-NULL, then @@ -237,6 +242,7 @@ void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap, // need to service some global event. // void prodOneCapability (void); +void prodCapability (Capability *cap, Task *task); // Similar to prodOneCapability(), but prods all of them. // diff --git a/rts/RtsFlags.c b/rts/RtsFlags.c index 1cbd569d6b..cce2b28fed 100644 --- a/rts/RtsFlags.c +++ b/rts/RtsFlags.c @@ -214,7 +214,8 @@ void initRtsFlagsDefaults(void) RtsFlags.ParFlags.nNodes = 1; RtsFlags.ParFlags.migrate = rtsTrue; RtsFlags.ParFlags.wakeupMigrate = rtsFalse; - RtsFlags.ParFlags.gcThreads = 1; + RtsFlags.ParFlags.parGcEnabled = 1; + RtsFlags.ParFlags.parGcGen = 1; #endif #ifdef PAR @@ -450,8 +451,9 @@ usage_text[] = { "", #endif /* DEBUG */ #if defined(THREADED_RTS) && !defined(NOSMP) -" -N<n> Use <n> OS threads (default: 1) (also sets -g)", -" -g<n> Use <n> OS threads for GC (default: 1)", +" -N<n> Use <n> OS threads (default: 1)", +" -q1 Use one OS thread for GC (turns off parallel GC)", +" -qg<n> Use parallel GC only for generations >= <n> (default: 1)", " -qm Don't automatically migrate threads between CPUs", " -qw Migrate a thread to the current CPU when it is woken up", #endif @@ -1132,8 +1134,6 @@ error = rtsTrue; if (rts_argv[arg][2] != '\0') { RtsFlags.ParFlags.nNodes = strtol(rts_argv[arg]+2, (char **) NULL, 10); - // set -g at the same time as -N by default - RtsFlags.ParFlags.gcThreads = RtsFlags.ParFlags.nNodes; if (RtsFlags.ParFlags.nNodes <= 0) { errorBelch("bad value for -N"); error = rtsTrue; @@ -1149,15 +1149,17 @@ error = rtsTrue; case 'g': THREADED_BUILD_ONLY( - if (rts_argv[arg][2] != '\0') { - RtsFlags.ParFlags.gcThreads - = strtol(rts_argv[arg]+2, (char **) NULL, 10); - if (RtsFlags.ParFlags.gcThreads <= 0) { - errorBelch("bad value for -g"); - error = rtsTrue; - } - } - ) break; + switch (rts_argv[arg][2]) { + case '1': + // backwards compat only + RtsFlags.ParFlags.parGcEnabled = rtsFalse; + break; + default: + errorBelch("unknown RTS option: %s",rts_argv[arg]); + error = rtsTrue; + break; + } + ) break; case 'q': switch (rts_argv[arg][2]) { @@ -1165,6 +1167,18 @@ error = rtsTrue; errorBelch("incomplete RTS option: %s",rts_argv[arg]); error = rtsTrue; break; + case '1': + RtsFlags.ParFlags.parGcEnabled = rtsFalse; + break; + case 'g': + if (rts_argv[arg][3] != '\0') { + RtsFlags.ParFlags.parGcGen + = strtol(rts_argv[arg]+3, (char **) NULL, 10); + } else { + errorBelch("bad value for -qg"); + error = rtsTrue; + } + break; case 'm': RtsFlags.ParFlags.migrate = rtsFalse; break; diff --git a/rts/Schedule.c b/rts/Schedule.c index 7dd063423f..31a487515a 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -31,6 +31,7 @@ #include "Updates.h" #include "Proftimer.h" #include "ProfHeap.h" +#include "GC.h" /* PARALLEL_HASKELL includes go here */ @@ -1478,7 +1479,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) #ifdef THREADED_RTS /* extern static volatile StgWord waiting_for_gc; lives inside capability.c */ - rtsBool was_waiting; + rtsBool gc_type, prev_pending_gc; nat i; #endif @@ -1490,6 +1491,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) } #ifdef THREADED_RTS + if (sched_state < SCHED_INTERRUPTING + && RtsFlags.ParFlags.parGcEnabled + && N >= RtsFlags.ParFlags.parGcGen + && ! oldest_gen->steps[0].mark) + { + gc_type = PENDING_GC_PAR; + } else { + gc_type = PENDING_GC_SEQ; + } + // In order to GC, there must be no threads running Haskell code. // Therefore, the GC thread needs to hold *all* the capabilities, // and release them after the GC has completed. @@ -1500,39 +1511,55 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) // actually did the GC. But it's quite hard to arrange for all // the other tasks to sleep and stay asleep. // - + /* Other capabilities are prevented from running yet more Haskell threads if waiting_for_gc is set. Tested inside yieldCapability() and releaseCapability() in Capability.c */ - was_waiting = cas(&waiting_for_gc, 0, 1); - if (was_waiting) { + prev_pending_gc = cas(&waiting_for_gc, 0, gc_type); + if (prev_pending_gc) { do { - debugTrace(DEBUG_sched, "someone else is trying to GC..."); - if (cap) yieldCapability(&cap,task); + debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", + prev_pending_gc); + ASSERT(cap); + yieldCapability(&cap,task); } while (waiting_for_gc); return cap; // NOTE: task->cap might have changed here } setContextSwitches(); - for (i=0; i < n_capabilities; i++) { - debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); - if (cap != &capabilities[i]) { - Capability *pcap = &capabilities[i]; - // we better hope this task doesn't get migrated to - // another Capability while we're waiting for this one. - // It won't, because load balancing happens while we have - // all the Capabilities, but even so it's a slightly - // unsavoury invariant. - task->cap = pcap; - waitForReturnCapability(&pcap, task); - if (pcap != &capabilities[i]) { - barf("scheduleDoGC: got the wrong capability"); - } - } + + // The final shutdown GC is always single-threaded, because it's + // possible that some of the Capabilities have no worker threads. + + if (gc_type == PENDING_GC_SEQ) + { + // single-threaded GC: grab all the capabilities + for (i=0; i < n_capabilities; i++) { + debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); + if (cap != &capabilities[i]) { + Capability *pcap = &capabilities[i]; + // we better hope this task doesn't get migrated to + // another Capability while we're waiting for this one. + // It won't, because load balancing happens while we have + // all the Capabilities, but even so it's a slightly + // unsavoury invariant. + task->cap = pcap; + waitForReturnCapability(&pcap, task); + if (pcap != &capabilities[i]) { + barf("scheduleDoGC: got the wrong capability"); + } + } + } } + else + { + // multi-threaded GC: make sure all the Capabilities donate one + // GC thread each. + debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); - waiting_for_gc = rtsFalse; + waitForGcThreads(cap); + } #endif // so this happens periodically: @@ -1545,23 +1572,23 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) * state, then we should take the opportunity to delete all the * threads in the system. */ - if (sched_state >= SCHED_INTERRUPTING) { - deleteAllThreads(&capabilities[0]); + if (sched_state == SCHED_INTERRUPTING) { + deleteAllThreads(cap); sched_state = SCHED_SHUTTING_DOWN; } heap_census = scheduleNeedHeapProfile(rtsTrue); - /* everybody back, start the GC. - * Could do it in this thread, or signal a condition var - * to do it in another thread. Either way, we need to - * broadcast on gc_pending_cond afterward. - */ #if defined(THREADED_RTS) debugTrace(DEBUG_sched, "doing GC"); + // reset waiting_for_gc *before* GC, so that when the GC threads + // emerge they don't immediately re-enter the GC. + waiting_for_gc = 0; + GarbageCollect(force_major || heap_census, gc_type, cap); +#else + GarbageCollect(force_major || heap_census, 0, cap); #endif - GarbageCollect(force_major || heap_census); - + if (heap_census) { debugTrace(DEBUG_sched, "performing heap census"); heapCensus(); @@ -1587,12 +1614,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) } #if defined(THREADED_RTS) - // release our stash of capabilities. - for (i = 0; i < n_capabilities; i++) { - if (cap != &capabilities[i]) { - task->cap = &capabilities[i]; - releaseCapability(&capabilities[i]); - } + if (gc_type == PENDING_GC_SEQ) { + // release our stash of capabilities. + for (i = 0; i < n_capabilities; i++) { + if (cap != &capabilities[i]) { + task->cap = &capabilities[i]; + releaseCapability(&capabilities[i]); + } + } } if (cap) { task->cap = cap; @@ -2131,7 +2160,13 @@ exitScheduler( // If we haven't killed all the threads yet, do it now. if (sched_state < SCHED_SHUTTING_DOWN) { sched_state = SCHED_INTERRUPTING; - scheduleDoGC(NULL,task,rtsFalse); +#if defined(THREADED_RTS) + waitForReturnCapability(&task->cap,task); + scheduleDoGC(task->cap,task,rtsFalse); + releaseCapability(task->cap); +#else + scheduleDoGC(&MainCapability,task,rtsFalse); +#endif } sched_state = SCHED_SHUTTING_DOWN; @@ -2184,13 +2219,17 @@ static void performGC_(rtsBool force_major) { Task *task; + // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the // suspended_ccalling_tasks queue. ACQUIRE_LOCK(&sched_mutex); task = newBoundTask(); RELEASE_LOCK(&sched_mutex); - scheduleDoGC(NULL,task,force_major); + + waitForReturnCapability(&task->cap,task); + scheduleDoGC(task->cap,task,force_major); + releaseCapability(task->cap); boundTaskExiting(task); } diff --git a/rts/Stats.c b/rts/Stats.c index 228f0c021e..9c17856970 100644 --- a/rts/Stats.c +++ b/rts/Stats.c @@ -613,11 +613,11 @@ stat_exit(int alloc) } #if defined(THREADED_RTS) - if (RtsFlags.ParFlags.gcThreads > 1) { + if (RtsFlags.ParFlags.parGcEnabled) { statsPrintf("\n Parallel GC work balance: %.2f (%ld / %ld, ideal %d)\n", (double)GC_par_avg_copied / (double)GC_par_max_copied, (lnat)GC_par_avg_copied, (lnat)GC_par_max_copied, - RtsFlags.ParFlags.gcThreads + RtsFlags.ParFlags.nNodes ); } #endif diff --git a/rts/sm/GC.c b/rts/sm/GC.c index aff33201a9..bf2464bb15 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -138,7 +138,6 @@ DECLARE_GCT static void mark_root (void *user, StgClosure **root); static void zero_static_object_list (StgClosure* first_static); static nat initialise_N (rtsBool force_major_gc); -static void alloc_gc_threads (void); static void init_collected_gen (nat g, nat threads); static void init_uncollected_gen (nat g, nat threads); static void init_gc_thread (gc_thread *t); @@ -149,8 +148,9 @@ static void start_gc_threads (void); static void scavenge_until_all_done (void); static nat inc_running (void); static nat dec_running (void); -static void wakeup_gc_threads (nat n_threads); -static void shutdown_gc_threads (nat n_threads); +static void wakeup_gc_threads (nat n_threads, nat me); +static void shutdown_gc_threads (nat n_threads, nat me); +static void continue_gc_threads (nat n_threads, nat me); #if 0 && defined(DEBUG) static void gcCAFs (void); @@ -180,7 +180,9 @@ StgPtr oldgen_scan; -------------------------------------------------------------------------- */ void -GarbageCollect ( rtsBool force_major_gc ) +GarbageCollect (rtsBool force_major_gc, + nat gc_type USED_IF_THREADS, + Capability *cap USED_IF_THREADS) { bdescr *bd; step *stp; @@ -234,26 +236,24 @@ GarbageCollect ( rtsBool force_major_gc ) */ n = initialise_N(force_major_gc); - /* Allocate + initialise the gc_thread structures. - */ - alloc_gc_threads(); - /* Start threads, so they can be spinning up while we finish initialisation. */ start_gc_threads(); +#if defined(THREADED_RTS) /* How many threads will be participating in this GC? - * We don't try to parallelise minor GC, or mark/compact/sweep GC. + * We don't try to parallelise minor GCs (unless the user asks for + * it with +RTS -gn0), or mark/compact/sweep GC. */ -#if defined(THREADED_RTS) - if (n < (4*1024*1024 / BLOCK_SIZE) || oldest_gen->steps[0].mark) { - n_gc_threads = 1; + if (gc_type == PENDING_GC_PAR) { + n_gc_threads = RtsFlags.ParFlags.nNodes; } else { - n_gc_threads = RtsFlags.ParFlags.gcThreads; + n_gc_threads = 1; } #else n_gc_threads = 1; #endif + trace(TRACE_gc|DEBUG_gc, "GC (gen %d): %d KB to collect, %ld MB in use, using %d thread(s)", N, n * (BLOCK_SIZE / 1024), mblocks_allocated, n_gc_threads); @@ -302,7 +302,15 @@ GarbageCollect ( rtsBool force_major_gc ) } // this is the main thread +#ifdef THREADED_RTS + if (n_gc_threads == 1) { + gct = gc_threads[0]; + } else { + gct = gc_threads[cap->no]; + } +#else gct = gc_threads[0]; +#endif /* ----------------------------------------------------------------------- * follow all the roots that we know about: @@ -323,7 +331,7 @@ GarbageCollect ( rtsBool force_major_gc ) // NB. do this after the mutable lists have been saved above, otherwise // the other GC threads will be writing into the old mutable lists. inc_running(); - wakeup_gc_threads(n_gc_threads); + wakeup_gc_threads(n_gc_threads, gct->thread_index); for (g = RtsFlags.GcFlags.generations-1; g > N; g--) { scavenge_mutable_list(&generations[g]); @@ -378,7 +386,7 @@ GarbageCollect ( rtsBool force_major_gc ) break; } - shutdown_gc_threads(n_gc_threads); + shutdown_gc_threads(n_gc_threads, gct->thread_index); // Update pointers from the Task list update_task_list(); @@ -756,6 +764,9 @@ GarbageCollect ( rtsBool force_major_gc ) slop = calcLiveBlocks() * BLOCK_SIZE_W - live; stat_endGC(allocated, live, copied, N, max_copied, avg_copied, slop); + // Guess which generation we'll collect *next* time + initialise_N(force_major_gc); + #if defined(RTS_USER_SIGNALS) if (RtsFlags.MiscFlags.install_signal_handlers) { // unblock signals again @@ -763,6 +774,8 @@ GarbageCollect ( rtsBool force_major_gc ) } #endif + continue_gc_threads(n_gc_threads, gct->thread_index); + RELEASE_SM_LOCK; gct = saved_gct; @@ -814,6 +827,11 @@ initialise_N (rtsBool force_major_gc) Initialise the gc_thread structures. -------------------------------------------------------------------------- */ +#define GC_THREAD_INACTIVE 0 +#define GC_THREAD_STANDING_BY 1 +#define GC_THREAD_RUNNING 2 +#define GC_THREAD_WAITING_TO_CONTINUE 3 + static gc_thread * alloc_gc_thread (int n) { @@ -826,11 +844,11 @@ alloc_gc_thread (int n) #ifdef THREADED_RTS t->id = 0; - initCondition(&t->wake_cond); - initMutex(&t->wake_mutex); - t->wakeup = rtsTrue; // starts true, so we can wait for the + initSpinLock(&t->gc_spin); + initSpinLock(&t->mut_spin); + ACQUIRE_SPIN_LOCK(&t->gc_spin); + t->wakeup = GC_THREAD_INACTIVE; // starts true, so we can wait for the // thread to start up, see wakeup_gc_threads - t->exit = rtsFalse; #endif t->thread_index = n; @@ -864,17 +882,17 @@ alloc_gc_thread (int n) } -static void -alloc_gc_threads (void) +void +initGcThreads (void) { if (gc_threads == NULL) { #if defined(THREADED_RTS) nat i; - gc_threads = stgMallocBytes (RtsFlags.ParFlags.gcThreads * + gc_threads = stgMallocBytes (RtsFlags.ParFlags.nNodes * sizeof(gc_thread*), "alloc_gc_threads"); - for (i = 0; i < RtsFlags.ParFlags.gcThreads; i++) { + for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) { gc_threads[i] = alloc_gc_thread(i); } #else @@ -992,113 +1010,107 @@ loop: } #if defined(THREADED_RTS) -// -// gc_thread_work(): Scavenge until there's no work left to do and all -// the running threads are idle. -// -static void -gc_thread_work (void) + +void +gcWorkerThread (Capability *cap) { - // gc_running_threads has already been incremented for us; this is - // a worker thread and the main thread bumped gc_running_threads - // before waking us up. + cap->in_gc = rtsTrue; + + gct = gc_threads[cap->no]; + gct->id = osThreadId(); + // Wait until we're told to wake up + RELEASE_SPIN_LOCK(&gct->mut_spin); + gct->wakeup = GC_THREAD_STANDING_BY; + debugTrace(DEBUG_gc, "GC thread %d standing by...", gct->thread_index); + ACQUIRE_SPIN_LOCK(&gct->gc_spin); + +#ifdef USE_PAPI + // start performance counters in this thread... + if (gct->papi_events == -1) { + papi_init_eventset(&gct->papi_events); + } + papi_thread_start_gc1_count(gct->papi_events); +#endif + // Every thread evacuates some roots. gct->evac_step = 0; markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads, rtsTrue/*prune sparks*/); scavenge_until_all_done(); -} - - -static void -gc_thread_mainloop (void) -{ - while (!gct->exit) { - - // Wait until we're told to wake up - ACQUIRE_LOCK(&gct->wake_mutex); - gct->wakeup = rtsFalse; - while (!gct->wakeup) { - debugTrace(DEBUG_gc, "GC thread %d standing by...", - gct->thread_index); - waitCondition(&gct->wake_cond, &gct->wake_mutex); - } - RELEASE_LOCK(&gct->wake_mutex); - if (gct->exit) break; - + #ifdef USE_PAPI - // start performance counters in this thread... - if (gct->papi_events == -1) { - papi_init_eventset(&gct->papi_events); - } - papi_thread_start_gc1_count(gct->papi_events); + // count events in this thread towards the GC totals + papi_thread_stop_gc1_count(gct->papi_events); #endif - gc_thread_work(); + // Wait until we're told to continue + RELEASE_SPIN_LOCK(&gct->gc_spin); + gct->wakeup = GC_THREAD_WAITING_TO_CONTINUE; + debugTrace(DEBUG_gc, "GC thread %d waiting to continue...", + gct->thread_index); + ACQUIRE_SPIN_LOCK(&gct->mut_spin); + debugTrace(DEBUG_gc, "GC thread %d on my way...", gct->thread_index); +} -#ifdef USE_PAPI - // count events in this thread towards the GC totals - papi_thread_stop_gc1_count(gct->papi_events); -#endif - } -} #endif -#if defined(THREADED_RTS) -static void -gc_thread_entry (gc_thread *my_gct) +void +waitForGcThreads (Capability *cap USED_IF_THREADS) { - gct = my_gct; - debugTrace(DEBUG_gc, "GC thread %d starting...", gct->thread_index); - gct->id = osThreadId(); - gc_thread_mainloop(); -} +#if defined(THREADED_RTS) + nat n_threads = RtsFlags.ParFlags.nNodes; + nat me = cap->no; + nat i, j; + rtsBool retry = rtsTrue; + + while(retry) { + for (i=0; i < n_threads; i++) { + if (i == me) continue; + if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) { + prodCapability(&capabilities[i], cap->running_task); + } + } + for (j=0; j < 10000000; j++) { + retry = rtsFalse; + for (i=0; i < n_threads; i++) { + if (i == me) continue; + write_barrier(); + setContextSwitches(); + if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) { + retry = rtsTrue; + } + } + if (!retry) break; + } + } #endif +} static void start_gc_threads (void) { #if defined(THREADED_RTS) - nat i; - OSThreadId id; - static rtsBool done = rtsFalse; - gc_running_threads = 0; initMutex(&gc_running_mutex); - - if (!done) { - // Start from 1: the main thread is 0 - for (i = 1; i < RtsFlags.ParFlags.gcThreads; i++) { - createOSThread(&id, (OSThreadProc*)&gc_thread_entry, - gc_threads[i]); - } - done = rtsTrue; - } #endif } static void -wakeup_gc_threads (nat n_threads USED_IF_THREADS) +wakeup_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS) { #if defined(THREADED_RTS) nat i; - for (i=1; i < n_threads; i++) { + for (i=0; i < n_threads; i++) { + if (i == me) continue; inc_running(); debugTrace(DEBUG_gc, "waking up gc thread %d", i); - do { - ACQUIRE_LOCK(&gc_threads[i]->wake_mutex); - if (gc_threads[i]->wakeup) { - RELEASE_LOCK(&gc_threads[i]->wake_mutex); - continue; - } else { - break; - } - } while (1); - gc_threads[i]->wakeup = rtsTrue; - signalCondition(&gc_threads[i]->wake_cond); - RELEASE_LOCK(&gc_threads[i]->wake_mutex); + if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) barf("wakeup_gc_threads"); + + gc_threads[i]->wakeup = GC_THREAD_RUNNING; + ACQUIRE_SPIN_LOCK(&gc_threads[i]->mut_spin); + RELEASE_SPIN_LOCK(&gc_threads[i]->gc_spin); } #endif } @@ -1107,18 +1119,29 @@ wakeup_gc_threads (nat n_threads USED_IF_THREADS) // standby state, otherwise they may still be executing inside // any_work(), and may even remain awake until the next GC starts. static void -shutdown_gc_threads (nat n_threads USED_IF_THREADS) +shutdown_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS) { #if defined(THREADED_RTS) nat i; - rtsBool wakeup; - for (i=1; i < n_threads; i++) { - do { - ACQUIRE_LOCK(&gc_threads[i]->wake_mutex); - wakeup = gc_threads[i]->wakeup; - // wakeup is false while the thread is waiting - RELEASE_LOCK(&gc_threads[i]->wake_mutex); - } while (wakeup); + for (i=0; i < n_threads; i++) { + if (i == me) continue; + while (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) { write_barrier(); } + } +#endif +} + +static void +continue_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + nat i; + for (i=0; i < n_threads; i++) { + if (i == me) continue; + if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) barf("continue_gc_threads"); + + gc_threads[i]->wakeup = GC_THREAD_INACTIVE; + ACQUIRE_SPIN_LOCK(&gc_threads[i]->gc_spin); + RELEASE_SPIN_LOCK(&gc_threads[i]->mut_spin); } #endif } diff --git a/rts/sm/GC.h b/rts/sm/GC.h index 6331320a30..5fb142f58f 100644 --- a/rts/sm/GC.h +++ b/rts/sm/GC.h @@ -40,6 +40,10 @@ extern SpinLock gc_alloc_block_sync; extern StgWord64 whitehole_spin; #endif +void gcWorkerThread (Capability *cap); +void initGcThreads (void); +void waitForGcThreads (Capability *cap); + #define WORK_UNIT_WORDS 128 #endif /* GC_H */ diff --git a/rts/sm/GCThread.h b/rts/sm/GCThread.h index 1b5c5d4291..d6af2b1571 100644 --- a/rts/sm/GCThread.h +++ b/rts/sm/GCThread.h @@ -113,10 +113,9 @@ typedef struct step_workspace_ { typedef struct gc_thread_ { #ifdef THREADED_RTS OSThreadId id; // The OS thread that this struct belongs to - Mutex wake_mutex; - Condition wake_cond; // So we can go to sleep between GCs - rtsBool wakeup; - rtsBool exit; + SpinLock gc_spin; + SpinLock mut_spin; + volatile rtsBool wakeup; #endif nat thread_index; // a zero based index identifying the thread diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c index 6c45cbed59..bf7c452d9b 100644 --- a/rts/sm/Storage.c +++ b/rts/sm/Storage.c @@ -276,6 +276,10 @@ initStorage( void ) whitehole_spin = 0; #endif + N = 0; + + initGcThreads(); + IF_DEBUG(gc, statDescribeGens()); RELEASE_SM_LOCK; |