diff options
-rw-r--r-- | includes/rts/Flags.h | 8 | ||||
-rw-r--r-- | rts/Capability.c | 1 | ||||
-rw-r--r-- | rts/Capability.h | 3 | ||||
-rw-r--r-- | rts/RtsFlags.c | 10 | ||||
-rw-r--r-- | rts/Schedule.c | 70 | ||||
-rw-r--r-- | rts/sm/GC.c | 28 | ||||
-rw-r--r-- | rts/sm/GCThread.h | 1 |
7 files changed, 110 insertions, 11 deletions
diff --git a/includes/rts/Flags.h b/includes/rts/Flags.h index 439b261fd8..10421f0ee9 100644 --- a/includes/rts/Flags.h +++ b/includes/rts/Flags.h @@ -170,6 +170,14 @@ struct PAR_FLAGS { unsigned int parGcLoadBalancingGen; /* do load-balancing in this * generation and higher only */ + + unsigned int parGcNoSyncWithIdle; + /* if a Capability has been idle for + * this many GCs, do not try to wake + * it up when doing a + * non-load-balancing parallel GC. + * (zero disables) */ + rtsBool setAffinity; /* force thread affinity with CPUs */ }; #endif /* THREADED_RTS */ diff --git a/rts/Capability.c b/rts/Capability.c index 7ce23a12e6..41efb176fd 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -226,6 +226,7 @@ initCapability( Capability *cap, nat i ) cap->no = i; cap->in_haskell = rtsFalse; + cap->idle = 0; cap->run_queue_hd = END_TSO_QUEUE; cap->run_queue_tl = END_TSO_QUEUE; diff --git a/rts/Capability.h b/rts/Capability.h index a4655dd36d..f60adf9de4 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -46,6 +46,9 @@ struct Capability_ { // catching unsafe call-ins. rtsBool in_haskell; + // Has there been any activity on this Capability since the last GC? + nat idle; + // The run queue. The Task owning this Capability has exclusive // access to its run queue, so can wake up threads without // taking a lock, and the common path through the scheduler is diff --git a/rts/RtsFlags.c b/rts/RtsFlags.c index 033db38435..2685d2e945 100644 --- a/rts/RtsFlags.c +++ b/rts/RtsFlags.c @@ -195,6 +195,7 @@ void initRtsFlagsDefaults(void) RtsFlags.ParFlags.parGcGen = 0; RtsFlags.ParFlags.parGcLoadBalancingEnabled = rtsTrue; RtsFlags.ParFlags.parGcLoadBalancingGen = 1; + RtsFlags.ParFlags.parGcNoSyncWithIdle = 0; RtsFlags.ParFlags.setAffinity = 0; #endif @@ -367,6 +368,9 @@ usage_text[] = { " (default: 1, -qb alone turns off load-balancing)", " -qa Use the OS to set thread affinity (experimental)", " -qm Don't automatically migrate threads between CPUs", +" -qi<n> If a processor has been idle for the last <n> GCs, do not", +" wake it up for a non-load-balancing parallel GC.", +" (0 disables, default: 0)", #endif " --install-signal-handlers=<yes|no>", " Install signal handlers (default: yes)", @@ -1193,7 +1197,11 @@ error = rtsTrue; = strtol(rts_argv[arg]+3, (char **) NULL, 10); } break; - case 'a': + case 'i': + RtsFlags.ParFlags.parGcNoSyncWithIdle + = strtol(rts_argv[arg]+3, (char **) NULL, 10); + break; + case 'a': RtsFlags.ParFlags.setAffinity = rtsTrue; break; case 'm': diff --git a/rts/Schedule.c b/rts/Schedule.c index 13c886a071..eedff32ed1 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -27,6 +27,7 @@ #include "ProfHeap.h" #include "Weak.h" #include "sm/GC.h" // waitForGcThreads, releaseGCThreads, N +#include "sm/GCThread.h" #include "Sparks.h" #include "Capability.h" #include "Task.h" @@ -115,6 +116,11 @@ Mutex sched_mutex; #define FORKPROCESS_PRIMOP_SUPPORTED #endif +// Local stats +#ifdef THREADED_RTS +static nat n_failed_trygrab_idles = 0, n_idle_caps = 0; +#endif + /* ----------------------------------------------------------------------------- * static function prototypes * -------------------------------------------------------------------------- */ @@ -426,6 +432,7 @@ run_thread: cap->interrupt = 0; cap->in_haskell = rtsTrue; + cap->idle = 0; dirty_TSO(cap,t); dirty_STACK(cap,t->stackobj); @@ -1413,6 +1420,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) { rtsBool heap_census; #ifdef THREADED_RTS + rtsBool idle_cap[n_capabilities]; rtsBool gc_type; nat i, sync; #endif @@ -1482,8 +1490,51 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) } else { - // multi-threaded GC: make sure all the Capabilities donate one - // GC thread each. + // If we are load-balancing collections in this + // generation, then we require all GC threads to participate + // in the collection. Otherwise, we only require active + // threads to participate, and we set gc_threads[i]->idle for + // any idle capabilities. The rationale here is that waking + // up an idle Capability takes much longer than just doing any + // GC work on its behalf. + + if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0 + || (RtsFlags.ParFlags.parGcLoadBalancingEnabled && + N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) { + for (i=0; i < n_capabilities; i++) { + idle_cap[i] = rtsFalse; + } + } else { + for (i=0; i < n_capabilities; i++) { + if (i == cap->no || capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) { + idle_cap[i] = rtsFalse; + } else { + idle_cap[i] = tryGrabCapability(&capabilities[i], task); + if (!idle_cap[i]) { + n_failed_trygrab_idles++; + } else { + n_idle_caps++; + } + } + } + } + + // We set the gc_thread[i]->idle flag if that + // capability/thread is not participating in this collection. + // We also keep a local record of which capabilities are idle + // in idle_cap[], because scheduleDoGC() is re-entrant: + // another thread might start a GC as soon as we've finished + // this one, and thus the gc_thread[]->idle flags are invalid + // as soon as we release any threads after GC. Getting this + // wrong leads to a rare and hard to debug deadlock! + + for (i=0; i < n_capabilities; i++) { + gc_threads[i]->idle = idle_cap[i]; + capabilities[i].idle++; + } + + // For all capabilities participating in this GC, wait until + // they have stopped mutating and are standing by for GC. waitForGcThreads(cap); #if defined(THREADED_RTS) @@ -1565,6 +1616,18 @@ delete_threads_and_gc: if (gc_type == SYNC_GC_PAR) { releaseGCThreads(cap); + for (i = 0; i < n_capabilities; i++) { + if (i != cap->no) { + if (idle_cap[i]) { + ASSERT(capabilities[i].running_task == task); + task->cap = &capabilities[i]; + releaseCapability(&capabilities[i]); + } else { + ASSERT(capabilities[i].running_task != task); + } + } + } + task->cap = cap; } #endif @@ -2278,6 +2341,9 @@ exitScheduler (rtsBool wait_foreign USED_IF_THREADS) shutdownCapabilities(task, wait_foreign); + // debugBelch("n_failed_trygrab_idles = %d, n_idle_caps = %d\n", + // n_failed_trygrab_idles, n_idle_caps); + boundTaskExiting(task); } diff --git a/rts/sm/GC.c b/rts/sm/GC.c index 3b65219927..928f4448d3 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -338,6 +338,13 @@ GarbageCollect (rtsBool force_major_gc, } } else { scavenge_capability_mut_lists(gct->cap); + for (n = 0; n < n_capabilities; n++) { + if (gc_threads[n]->idle) { + markCapability(mark_root, gct, &capabilities[n], + rtsTrue/*don't mark sparks*/); + scavenge_capability_mut_lists(&capabilities[n]); + } + } } // follow roots from the CAF list (used by GHCi) @@ -401,7 +408,11 @@ GarbageCollect (rtsBool force_major_gc, pruneSparkQueue(&capabilities[n]); } } else { - pruneSparkQueue(gct->cap); + for (n = 0; n < n_capabilities; n++) { + if (n == cap->no || gc_threads[n]->idle) { + pruneSparkQueue(&capabilities[n]); + } + } } #endif @@ -808,6 +819,7 @@ new_gc_thread (nat n, gc_thread *t) #endif t->thread_index = n; + t->idle = rtsFalse; t->free_blocks = NULL; t->gc_count = 0; @@ -1114,7 +1126,7 @@ waitForGcThreads (Capability *cap USED_IF_THREADS) while(retry) { for (i=0; i < n_threads; i++) { - if (i == me) continue; + if (i == me || gc_threads[i]->idle) continue; if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) { prodCapability(&capabilities[i], cap->running_task); } @@ -1122,7 +1134,7 @@ waitForGcThreads (Capability *cap USED_IF_THREADS) for (j=0; j < 10; j++) { retry = rtsFalse; for (i=0; i < n_threads; i++) { - if (i == me) continue; + if (i == me || gc_threads[i]->idle) continue; write_barrier(); interruptCapability(&capabilities[i]); if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) { @@ -1154,8 +1166,8 @@ wakeup_gc_threads (nat me USED_IF_THREADS) if (n_gc_threads == 1) return; for (i=0; i < n_gc_threads; i++) { - if (i == me) continue; - inc_running(); + if (i == me || gc_threads[i]->idle) continue; + inc_running(); debugTrace(DEBUG_gc, "waking up gc thread %d", i); if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) barf("wakeup_gc_threads"); @@ -1178,7 +1190,7 @@ shutdown_gc_threads (nat me USED_IF_THREADS) if (n_gc_threads == 1) return; for (i=0; i < n_gc_threads; i++) { - if (i == me) continue; + if (i == me || gc_threads[i]->idle) continue; while (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) { write_barrier(); } } #endif @@ -1192,8 +1204,8 @@ releaseGCThreads (Capability *cap USED_IF_THREADS) const nat me = cap->no; nat i; for (i=0; i < n_threads; i++) { - if (i == me) continue; - if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) + if (i == me || gc_threads[i]->idle) continue; + if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) barf("releaseGCThreads"); gc_threads[i]->wakeup = GC_THREAD_INACTIVE; diff --git a/rts/sm/GCThread.h b/rts/sm/GCThread.h index b4f325631f..60f721285d 100644 --- a/rts/sm/GCThread.h +++ b/rts/sm/GCThread.h @@ -125,6 +125,7 @@ typedef struct gc_thread_ { volatile rtsBool wakeup; #endif nat thread_index; // a zero based index identifying the thread + rtsBool idle; // sitting out of this GC cycle bdescr * free_blocks; // a buffer of free blocks for this thread // during GC without accessing the block |