summaryrefslogtreecommitdiff
path: root/rts/sm/GC.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2008-11-21 15:12:33 +0000
committerSimon Marlow <marlowsd@gmail.com>2008-11-21 15:12:33 +0000
commit3ebcd3deb769a03f4ded0fca2cf38201048c0214 (patch)
treeba4f0a6fc73550425a0db988bf4fbb9651d110aa /rts/sm/GC.c
parentc373ebdb90edee470ad6fa8277cbe7aa369f23f8 (diff)
downloadhaskell-3ebcd3deb769a03f4ded0fca2cf38201048c0214.tar.gz
Use mutator threads to do GC, instead of having a separate pool of GC threads
Previously, the GC had its own pool of threads to use as workers when doing parallel GC. There was a "leader", which was the mutator thread that initiated the GC, and the other threads were taken from the pool. This was simple and worked fine for sequential programs, where we did most of the benchmarking for the parallel GC, but falls down for parallel programs. When we have N mutator threads and N cores, at GC time we would have to stop N-1 mutator threads and start up N-1 GC threads, and hope that the OS schedules them all onto separate cores. It practice it doesn't, as you might expect. Now we use the mutator threads to do GC. This works quite nicely, particularly for parallel programs, where each mutator thread scans its own spark pool, which is probably in its cache anyway. There are some flag changes: -g<n> is removed (-g1 is still accepted for backwards compat). There's no way to have a different number of GC threads than mutator threads now. -q1 Use one OS thread for GC (turns off parallel GC) -qg<n> Use parallel GC for generations >= <n> (default: 1) Using parallel GC only for generations >=1 works well for sequential programs. Compiling an ordinary sequential program with -threaded and running it with -N2 or more should help if you do a lot of GC. I've found that adding -qg0 (do parallel GC for generation 0 too) speeds up some parallel programs, but slows down some sequential programs. Being conservative, I left the threshold at 1. ToDo: document the new options.
Diffstat (limited to 'rts/sm/GC.c')
-rw-r--r--rts/sm/GC.c237
1 files changed, 130 insertions, 107 deletions
diff --git a/rts/sm/GC.c b/rts/sm/GC.c
index aff33201a9..bf2464bb15 100644
--- a/rts/sm/GC.c
+++ b/rts/sm/GC.c
@@ -138,7 +138,6 @@ DECLARE_GCT
static void mark_root (void *user, StgClosure **root);
static void zero_static_object_list (StgClosure* first_static);
static nat initialise_N (rtsBool force_major_gc);
-static void alloc_gc_threads (void);
static void init_collected_gen (nat g, nat threads);
static void init_uncollected_gen (nat g, nat threads);
static void init_gc_thread (gc_thread *t);
@@ -149,8 +148,9 @@ static void start_gc_threads (void);
static void scavenge_until_all_done (void);
static nat inc_running (void);
static nat dec_running (void);
-static void wakeup_gc_threads (nat n_threads);
-static void shutdown_gc_threads (nat n_threads);
+static void wakeup_gc_threads (nat n_threads, nat me);
+static void shutdown_gc_threads (nat n_threads, nat me);
+static void continue_gc_threads (nat n_threads, nat me);
#if 0 && defined(DEBUG)
static void gcCAFs (void);
@@ -180,7 +180,9 @@ StgPtr oldgen_scan;
-------------------------------------------------------------------------- */
void
-GarbageCollect ( rtsBool force_major_gc )
+GarbageCollect (rtsBool force_major_gc,
+ nat gc_type USED_IF_THREADS,
+ Capability *cap USED_IF_THREADS)
{
bdescr *bd;
step *stp;
@@ -234,26 +236,24 @@ GarbageCollect ( rtsBool force_major_gc )
*/
n = initialise_N(force_major_gc);
- /* Allocate + initialise the gc_thread structures.
- */
- alloc_gc_threads();
-
/* Start threads, so they can be spinning up while we finish initialisation.
*/
start_gc_threads();
+#if defined(THREADED_RTS)
/* How many threads will be participating in this GC?
- * We don't try to parallelise minor GC, or mark/compact/sweep GC.
+ * We don't try to parallelise minor GCs (unless the user asks for
+ * it with +RTS -gn0), or mark/compact/sweep GC.
*/
-#if defined(THREADED_RTS)
- if (n < (4*1024*1024 / BLOCK_SIZE) || oldest_gen->steps[0].mark) {
- n_gc_threads = 1;
+ if (gc_type == PENDING_GC_PAR) {
+ n_gc_threads = RtsFlags.ParFlags.nNodes;
} else {
- n_gc_threads = RtsFlags.ParFlags.gcThreads;
+ n_gc_threads = 1;
}
#else
n_gc_threads = 1;
#endif
+
trace(TRACE_gc|DEBUG_gc, "GC (gen %d): %d KB to collect, %ld MB in use, using %d thread(s)",
N, n * (BLOCK_SIZE / 1024), mblocks_allocated, n_gc_threads);
@@ -302,7 +302,15 @@ GarbageCollect ( rtsBool force_major_gc )
}
// this is the main thread
+#ifdef THREADED_RTS
+ if (n_gc_threads == 1) {
+ gct = gc_threads[0];
+ } else {
+ gct = gc_threads[cap->no];
+ }
+#else
gct = gc_threads[0];
+#endif
/* -----------------------------------------------------------------------
* follow all the roots that we know about:
@@ -323,7 +331,7 @@ GarbageCollect ( rtsBool force_major_gc )
// NB. do this after the mutable lists have been saved above, otherwise
// the other GC threads will be writing into the old mutable lists.
inc_running();
- wakeup_gc_threads(n_gc_threads);
+ wakeup_gc_threads(n_gc_threads, gct->thread_index);
for (g = RtsFlags.GcFlags.generations-1; g > N; g--) {
scavenge_mutable_list(&generations[g]);
@@ -378,7 +386,7 @@ GarbageCollect ( rtsBool force_major_gc )
break;
}
- shutdown_gc_threads(n_gc_threads);
+ shutdown_gc_threads(n_gc_threads, gct->thread_index);
// Update pointers from the Task list
update_task_list();
@@ -756,6 +764,9 @@ GarbageCollect ( rtsBool force_major_gc )
slop = calcLiveBlocks() * BLOCK_SIZE_W - live;
stat_endGC(allocated, live, copied, N, max_copied, avg_copied, slop);
+ // Guess which generation we'll collect *next* time
+ initialise_N(force_major_gc);
+
#if defined(RTS_USER_SIGNALS)
if (RtsFlags.MiscFlags.install_signal_handlers) {
// unblock signals again
@@ -763,6 +774,8 @@ GarbageCollect ( rtsBool force_major_gc )
}
#endif
+ continue_gc_threads(n_gc_threads, gct->thread_index);
+
RELEASE_SM_LOCK;
gct = saved_gct;
@@ -814,6 +827,11 @@ initialise_N (rtsBool force_major_gc)
Initialise the gc_thread structures.
-------------------------------------------------------------------------- */
+#define GC_THREAD_INACTIVE 0
+#define GC_THREAD_STANDING_BY 1
+#define GC_THREAD_RUNNING 2
+#define GC_THREAD_WAITING_TO_CONTINUE 3
+
static gc_thread *
alloc_gc_thread (int n)
{
@@ -826,11 +844,11 @@ alloc_gc_thread (int n)
#ifdef THREADED_RTS
t->id = 0;
- initCondition(&t->wake_cond);
- initMutex(&t->wake_mutex);
- t->wakeup = rtsTrue; // starts true, so we can wait for the
+ initSpinLock(&t->gc_spin);
+ initSpinLock(&t->mut_spin);
+ ACQUIRE_SPIN_LOCK(&t->gc_spin);
+ t->wakeup = GC_THREAD_INACTIVE; // starts true, so we can wait for the
// thread to start up, see wakeup_gc_threads
- t->exit = rtsFalse;
#endif
t->thread_index = n;
@@ -864,17 +882,17 @@ alloc_gc_thread (int n)
}
-static void
-alloc_gc_threads (void)
+void
+initGcThreads (void)
{
if (gc_threads == NULL) {
#if defined(THREADED_RTS)
nat i;
- gc_threads = stgMallocBytes (RtsFlags.ParFlags.gcThreads *
+ gc_threads = stgMallocBytes (RtsFlags.ParFlags.nNodes *
sizeof(gc_thread*),
"alloc_gc_threads");
- for (i = 0; i < RtsFlags.ParFlags.gcThreads; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
gc_threads[i] = alloc_gc_thread(i);
}
#else
@@ -992,113 +1010,107 @@ loop:
}
#if defined(THREADED_RTS)
-//
-// gc_thread_work(): Scavenge until there's no work left to do and all
-// the running threads are idle.
-//
-static void
-gc_thread_work (void)
+
+void
+gcWorkerThread (Capability *cap)
{
- // gc_running_threads has already been incremented for us; this is
- // a worker thread and the main thread bumped gc_running_threads
- // before waking us up.
+ cap->in_gc = rtsTrue;
+
+ gct = gc_threads[cap->no];
+ gct->id = osThreadId();
+ // Wait until we're told to wake up
+ RELEASE_SPIN_LOCK(&gct->mut_spin);
+ gct->wakeup = GC_THREAD_STANDING_BY;
+ debugTrace(DEBUG_gc, "GC thread %d standing by...", gct->thread_index);
+ ACQUIRE_SPIN_LOCK(&gct->gc_spin);
+
+#ifdef USE_PAPI
+ // start performance counters in this thread...
+ if (gct->papi_events == -1) {
+ papi_init_eventset(&gct->papi_events);
+ }
+ papi_thread_start_gc1_count(gct->papi_events);
+#endif
+
// Every thread evacuates some roots.
gct->evac_step = 0;
markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads,
rtsTrue/*prune sparks*/);
scavenge_until_all_done();
-}
-
-
-static void
-gc_thread_mainloop (void)
-{
- while (!gct->exit) {
-
- // Wait until we're told to wake up
- ACQUIRE_LOCK(&gct->wake_mutex);
- gct->wakeup = rtsFalse;
- while (!gct->wakeup) {
- debugTrace(DEBUG_gc, "GC thread %d standing by...",
- gct->thread_index);
- waitCondition(&gct->wake_cond, &gct->wake_mutex);
- }
- RELEASE_LOCK(&gct->wake_mutex);
- if (gct->exit) break;
-
+
#ifdef USE_PAPI
- // start performance counters in this thread...
- if (gct->papi_events == -1) {
- papi_init_eventset(&gct->papi_events);
- }
- papi_thread_start_gc1_count(gct->papi_events);
+ // count events in this thread towards the GC totals
+ papi_thread_stop_gc1_count(gct->papi_events);
#endif
- gc_thread_work();
+ // Wait until we're told to continue
+ RELEASE_SPIN_LOCK(&gct->gc_spin);
+ gct->wakeup = GC_THREAD_WAITING_TO_CONTINUE;
+ debugTrace(DEBUG_gc, "GC thread %d waiting to continue...",
+ gct->thread_index);
+ ACQUIRE_SPIN_LOCK(&gct->mut_spin);
+ debugTrace(DEBUG_gc, "GC thread %d on my way...", gct->thread_index);
+}
-#ifdef USE_PAPI
- // count events in this thread towards the GC totals
- papi_thread_stop_gc1_count(gct->papi_events);
-#endif
- }
-}
#endif
-#if defined(THREADED_RTS)
-static void
-gc_thread_entry (gc_thread *my_gct)
+void
+waitForGcThreads (Capability *cap USED_IF_THREADS)
{
- gct = my_gct;
- debugTrace(DEBUG_gc, "GC thread %d starting...", gct->thread_index);
- gct->id = osThreadId();
- gc_thread_mainloop();
-}
+#if defined(THREADED_RTS)
+ nat n_threads = RtsFlags.ParFlags.nNodes;
+ nat me = cap->no;
+ nat i, j;
+ rtsBool retry = rtsTrue;
+
+ while(retry) {
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
+ if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+ prodCapability(&capabilities[i], cap->running_task);
+ }
+ }
+ for (j=0; j < 10000000; j++) {
+ retry = rtsFalse;
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
+ write_barrier();
+ setContextSwitches();
+ if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+ retry = rtsTrue;
+ }
+ }
+ if (!retry) break;
+ }
+ }
#endif
+}
static void
start_gc_threads (void)
{
#if defined(THREADED_RTS)
- nat i;
- OSThreadId id;
- static rtsBool done = rtsFalse;
-
gc_running_threads = 0;
initMutex(&gc_running_mutex);
-
- if (!done) {
- // Start from 1: the main thread is 0
- for (i = 1; i < RtsFlags.ParFlags.gcThreads; i++) {
- createOSThread(&id, (OSThreadProc*)&gc_thread_entry,
- gc_threads[i]);
- }
- done = rtsTrue;
- }
#endif
}
static void
-wakeup_gc_threads (nat n_threads USED_IF_THREADS)
+wakeup_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
{
#if defined(THREADED_RTS)
nat i;
- for (i=1; i < n_threads; i++) {
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
inc_running();
debugTrace(DEBUG_gc, "waking up gc thread %d", i);
- do {
- ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
- if (gc_threads[i]->wakeup) {
- RELEASE_LOCK(&gc_threads[i]->wake_mutex);
- continue;
- } else {
- break;
- }
- } while (1);
- gc_threads[i]->wakeup = rtsTrue;
- signalCondition(&gc_threads[i]->wake_cond);
- RELEASE_LOCK(&gc_threads[i]->wake_mutex);
+ if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) barf("wakeup_gc_threads");
+
+ gc_threads[i]->wakeup = GC_THREAD_RUNNING;
+ ACQUIRE_SPIN_LOCK(&gc_threads[i]->mut_spin);
+ RELEASE_SPIN_LOCK(&gc_threads[i]->gc_spin);
}
#endif
}
@@ -1107,18 +1119,29 @@ wakeup_gc_threads (nat n_threads USED_IF_THREADS)
// standby state, otherwise they may still be executing inside
// any_work(), and may even remain awake until the next GC starts.
static void
-shutdown_gc_threads (nat n_threads USED_IF_THREADS)
+shutdown_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
{
#if defined(THREADED_RTS)
nat i;
- rtsBool wakeup;
- for (i=1; i < n_threads; i++) {
- do {
- ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
- wakeup = gc_threads[i]->wakeup;
- // wakeup is false while the thread is waiting
- RELEASE_LOCK(&gc_threads[i]->wake_mutex);
- } while (wakeup);
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
+ while (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) { write_barrier(); }
+ }
+#endif
+}
+
+static void
+continue_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
+ if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) barf("continue_gc_threads");
+
+ gc_threads[i]->wakeup = GC_THREAD_INACTIVE;
+ ACQUIRE_SPIN_LOCK(&gc_threads[i]->gc_spin);
+ RELEASE_SPIN_LOCK(&gc_threads[i]->mut_spin);
}
#endif
}