summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--includes/RtsFlags.h4
-rw-r--r--includes/Storage.h2
-rw-r--r--rts/Capability.c74
-rw-r--r--rts/Capability.h6
-rw-r--r--rts/RtsFlags.c42
-rw-r--r--rts/Schedule.c117
-rw-r--r--rts/Stats.c4
-rw-r--r--rts/sm/GC.c237
-rw-r--r--rts/sm/GC.h4
-rw-r--r--rts/sm/GCThread.h7
-rw-r--r--rts/sm/Storage.c4
11 files changed, 278 insertions, 223 deletions
diff --git a/includes/RtsFlags.h b/includes/RtsFlags.h
index 55b00bb2a0..e14c9405ff 100644
--- a/includes/RtsFlags.h
+++ b/includes/RtsFlags.h
@@ -179,7 +179,9 @@ struct PAR_FLAGS {
rtsBool migrate; /* migrate threads between capabilities */
rtsBool wakeupMigrate; /* migrate a thread on wakeup */
unsigned int maxLocalSparks;
- nat gcThreads; /* number of threads for parallel GC */
+ rtsBool parGcEnabled; /* enable parallel GC */
+ rtsBool parGcGen; /* do parallel GC in this generation
+ * and higher only */
};
#endif /* THREADED_RTS */
diff --git a/includes/Storage.h b/includes/Storage.h
index d431298af9..0a7aae6750 100644
--- a/includes/Storage.h
+++ b/includes/Storage.h
@@ -220,7 +220,7 @@ extern bdescr * splitLargeBlock (bdescr *bd, nat blocks);
-------------------------------------------------------------------------- */
-extern void GarbageCollect(rtsBool force_major_gc);
+extern void GarbageCollect(rtsBool force_major_gc, nat gc_type, Capability *cap);
/* -----------------------------------------------------------------------------
Generational garbage collection support
diff --git a/rts/Capability.c b/rts/Capability.c
index 8dddbc5d34..7c6ceb5c66 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -26,6 +26,7 @@
#include "Schedule.h"
#include "Sparks.h"
#include "Trace.h"
+#include "GC.h"
// one global capability, this is the Capability for non-threaded
// builds, and for +RTS -N1
@@ -190,6 +191,7 @@ initCapability( Capability *cap, nat i )
cap->no = i;
cap->in_haskell = rtsFalse;
+ cap->in_gc = rtsFalse;
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
@@ -358,14 +360,7 @@ releaseCapability_ (Capability* cap,
return;
}
- /* if waiting_for_gc was the reason to release the cap: thread
- comes from yieldCap->releaseAndQueueWorker. Unconditionally set
- cap. free and return (see default after the if-protected other
- special cases). Thread will wait on cond.var and re-acquire the
- same cap after GC (GC-triggering cap. calls releaseCap and
- enters the spare_workers case)
- */
- if (waiting_for_gc) {
+ if (waiting_for_gc == PENDING_GC_SEQ) {
last_free_capability = cap; // needed?
trace(TRACE_sched | DEBUG_sched,
"GC pending, set capability %d free", cap->no);
@@ -557,6 +552,12 @@ yieldCapability (Capability** pCap, Task *task)
{
Capability *cap = *pCap;
+ if (waiting_for_gc == PENDING_GC_PAR) {
+ debugTrace(DEBUG_sched, "capability %d: becoming a GC thread", cap->no);
+ gcWorkerThread(cap);
+ return;
+ }
+
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
@@ -655,58 +656,21 @@ wakeupThreadOnCapability (Capability *my_cap,
}
/* ----------------------------------------------------------------------------
- * prodCapabilities
+ * prodCapability
*
- * Used to indicate that the interrupted flag is now set, or some
- * other global condition that might require waking up a Task on each
- * Capability.
- * ------------------------------------------------------------------------- */
-
-static void
-prodCapabilities(rtsBool all)
-{
- nat i;
- Capability *cap;
- Task *task;
-
- for (i=0; i < n_capabilities; i++) {
- cap = &capabilities[i];
- ACQUIRE_LOCK(&cap->lock);
- if (!cap->running_task) {
- if (cap->spare_workers) {
- trace(TRACE_sched, "resuming capability %d", cap->no);
- task = cap->spare_workers;
- ASSERT(!task->stopped);
- giveCapabilityToTask(cap,task);
- if (!all) {
- RELEASE_LOCK(&cap->lock);
- return;
- }
- }
- }
- RELEASE_LOCK(&cap->lock);
- }
- return;
-}
-
-void
-prodAllCapabilities (void)
-{
- prodCapabilities(rtsTrue);
-}
-
-/* ----------------------------------------------------------------------------
- * prodOneCapability
- *
- * Like prodAllCapabilities, but we only require a single Task to wake
- * up in order to service some global event, such as checking for
- * deadlock after some idle time has passed.
+ * If a Capability is currently idle, wake up a Task on it. Used to
+ * get every Capability into the GC.
* ------------------------------------------------------------------------- */
void
-prodOneCapability (void)
+prodCapability (Capability *cap, Task *task)
{
- prodCapabilities(rtsFalse);
+ ACQUIRE_LOCK(&cap->lock);
+ if (!cap->running_task) {
+ cap->running_task = task;
+ releaseCapability_(cap,rtsTrue);
+ }
+ RELEASE_LOCK(&cap->lock);
}
/* ----------------------------------------------------------------------------
diff --git a/rts/Capability.h b/rts/Capability.h
index 89545780a4..478b0f1312 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -50,6 +50,9 @@ struct Capability_ {
// catching unsafe call-ins.
rtsBool in_haskell;
+ // true if this Capability is currently in the GC
+ rtsBool in_gc;
+
// The run queue. The Task owning this Capability has exclusive
// access to its run queue, so can wake up threads without
// taking a lock, and the common path through the scheduler is
@@ -191,6 +194,8 @@ extern Capability *capabilities;
extern Capability *last_free_capability;
// GC indicator, in scope for the scheduler
+#define PENDING_GC_SEQ 1
+#define PENDING_GC_PAR 2
extern volatile StgWord waiting_for_gc;
// Acquires a capability at a return point. If *cap is non-NULL, then
@@ -237,6 +242,7 @@ void wakeupThreadOnCapability (Capability *my_cap, Capability *other_cap,
// need to service some global event.
//
void prodOneCapability (void);
+void prodCapability (Capability *cap, Task *task);
// Similar to prodOneCapability(), but prods all of them.
//
diff --git a/rts/RtsFlags.c b/rts/RtsFlags.c
index 1cbd569d6b..cce2b28fed 100644
--- a/rts/RtsFlags.c
+++ b/rts/RtsFlags.c
@@ -214,7 +214,8 @@ void initRtsFlagsDefaults(void)
RtsFlags.ParFlags.nNodes = 1;
RtsFlags.ParFlags.migrate = rtsTrue;
RtsFlags.ParFlags.wakeupMigrate = rtsFalse;
- RtsFlags.ParFlags.gcThreads = 1;
+ RtsFlags.ParFlags.parGcEnabled = 1;
+ RtsFlags.ParFlags.parGcGen = 1;
#endif
#ifdef PAR
@@ -450,8 +451,9 @@ usage_text[] = {
"",
#endif /* DEBUG */
#if defined(THREADED_RTS) && !defined(NOSMP)
-" -N<n> Use <n> OS threads (default: 1) (also sets -g)",
-" -g<n> Use <n> OS threads for GC (default: 1)",
+" -N<n> Use <n> OS threads (default: 1)",
+" -q1 Use one OS thread for GC (turns off parallel GC)",
+" -qg<n> Use parallel GC only for generations >= <n> (default: 1)",
" -qm Don't automatically migrate threads between CPUs",
" -qw Migrate a thread to the current CPU when it is woken up",
#endif
@@ -1132,8 +1134,6 @@ error = rtsTrue;
if (rts_argv[arg][2] != '\0') {
RtsFlags.ParFlags.nNodes
= strtol(rts_argv[arg]+2, (char **) NULL, 10);
- // set -g at the same time as -N by default
- RtsFlags.ParFlags.gcThreads = RtsFlags.ParFlags.nNodes;
if (RtsFlags.ParFlags.nNodes <= 0) {
errorBelch("bad value for -N");
error = rtsTrue;
@@ -1149,15 +1149,17 @@ error = rtsTrue;
case 'g':
THREADED_BUILD_ONLY(
- if (rts_argv[arg][2] != '\0') {
- RtsFlags.ParFlags.gcThreads
- = strtol(rts_argv[arg]+2, (char **) NULL, 10);
- if (RtsFlags.ParFlags.gcThreads <= 0) {
- errorBelch("bad value for -g");
- error = rtsTrue;
- }
- }
- ) break;
+ switch (rts_argv[arg][2]) {
+ case '1':
+ // backwards compat only
+ RtsFlags.ParFlags.parGcEnabled = rtsFalse;
+ break;
+ default:
+ errorBelch("unknown RTS option: %s",rts_argv[arg]);
+ error = rtsTrue;
+ break;
+ }
+ ) break;
case 'q':
switch (rts_argv[arg][2]) {
@@ -1165,6 +1167,18 @@ error = rtsTrue;
errorBelch("incomplete RTS option: %s",rts_argv[arg]);
error = rtsTrue;
break;
+ case '1':
+ RtsFlags.ParFlags.parGcEnabled = rtsFalse;
+ break;
+ case 'g':
+ if (rts_argv[arg][3] != '\0') {
+ RtsFlags.ParFlags.parGcGen
+ = strtol(rts_argv[arg]+3, (char **) NULL, 10);
+ } else {
+ errorBelch("bad value for -qg");
+ error = rtsTrue;
+ }
+ break;
case 'm':
RtsFlags.ParFlags.migrate = rtsFalse;
break;
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 7dd063423f..31a487515a 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -31,6 +31,7 @@
#include "Updates.h"
#include "Proftimer.h"
#include "ProfHeap.h"
+#include "GC.h"
/* PARALLEL_HASKELL includes go here */
@@ -1478,7 +1479,7 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
#ifdef THREADED_RTS
/* extern static volatile StgWord waiting_for_gc;
lives inside capability.c */
- rtsBool was_waiting;
+ rtsBool gc_type, prev_pending_gc;
nat i;
#endif
@@ -1490,6 +1491,16 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
}
#ifdef THREADED_RTS
+ if (sched_state < SCHED_INTERRUPTING
+ && RtsFlags.ParFlags.parGcEnabled
+ && N >= RtsFlags.ParFlags.parGcGen
+ && ! oldest_gen->steps[0].mark)
+ {
+ gc_type = PENDING_GC_PAR;
+ } else {
+ gc_type = PENDING_GC_SEQ;
+ }
+
// In order to GC, there must be no threads running Haskell code.
// Therefore, the GC thread needs to hold *all* the capabilities,
// and release them after the GC has completed.
@@ -1500,39 +1511,55 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
// actually did the GC. But it's quite hard to arrange for all
// the other tasks to sleep and stay asleep.
//
-
+
/* Other capabilities are prevented from running yet more Haskell
threads if waiting_for_gc is set. Tested inside
yieldCapability() and releaseCapability() in Capability.c */
- was_waiting = cas(&waiting_for_gc, 0, 1);
- if (was_waiting) {
+ prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
+ if (prev_pending_gc) {
do {
- debugTrace(DEBUG_sched, "someone else is trying to GC...");
- if (cap) yieldCapability(&cap,task);
+ debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
+ prev_pending_gc);
+ ASSERT(cap);
+ yieldCapability(&cap,task);
} while (waiting_for_gc);
return cap; // NOTE: task->cap might have changed here
}
setContextSwitches();
- for (i=0; i < n_capabilities; i++) {
- debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
- if (cap != &capabilities[i]) {
- Capability *pcap = &capabilities[i];
- // we better hope this task doesn't get migrated to
- // another Capability while we're waiting for this one.
- // It won't, because load balancing happens while we have
- // all the Capabilities, but even so it's a slightly
- // unsavoury invariant.
- task->cap = pcap;
- waitForReturnCapability(&pcap, task);
- if (pcap != &capabilities[i]) {
- barf("scheduleDoGC: got the wrong capability");
- }
- }
+
+ // The final shutdown GC is always single-threaded, because it's
+ // possible that some of the Capabilities have no worker threads.
+
+ if (gc_type == PENDING_GC_SEQ)
+ {
+ // single-threaded GC: grab all the capabilities
+ for (i=0; i < n_capabilities; i++) {
+ debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
+ if (cap != &capabilities[i]) {
+ Capability *pcap = &capabilities[i];
+ // we better hope this task doesn't get migrated to
+ // another Capability while we're waiting for this one.
+ // It won't, because load balancing happens while we have
+ // all the Capabilities, but even so it's a slightly
+ // unsavoury invariant.
+ task->cap = pcap;
+ waitForReturnCapability(&pcap, task);
+ if (pcap != &capabilities[i]) {
+ barf("scheduleDoGC: got the wrong capability");
+ }
+ }
+ }
}
+ else
+ {
+ // multi-threaded GC: make sure all the Capabilities donate one
+ // GC thread each.
+ debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
- waiting_for_gc = rtsFalse;
+ waitForGcThreads(cap);
+ }
#endif
// so this happens periodically:
@@ -1545,23 +1572,23 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
* state, then we should take the opportunity to delete all the
* threads in the system.
*/
- if (sched_state >= SCHED_INTERRUPTING) {
- deleteAllThreads(&capabilities[0]);
+ if (sched_state == SCHED_INTERRUPTING) {
+ deleteAllThreads(cap);
sched_state = SCHED_SHUTTING_DOWN;
}
heap_census = scheduleNeedHeapProfile(rtsTrue);
- /* everybody back, start the GC.
- * Could do it in this thread, or signal a condition var
- * to do it in another thread. Either way, we need to
- * broadcast on gc_pending_cond afterward.
- */
#if defined(THREADED_RTS)
debugTrace(DEBUG_sched, "doing GC");
+ // reset waiting_for_gc *before* GC, so that when the GC threads
+ // emerge they don't immediately re-enter the GC.
+ waiting_for_gc = 0;
+ GarbageCollect(force_major || heap_census, gc_type, cap);
+#else
+ GarbageCollect(force_major || heap_census, 0, cap);
#endif
- GarbageCollect(force_major || heap_census);
-
+
if (heap_census) {
debugTrace(DEBUG_sched, "performing heap census");
heapCensus();
@@ -1587,12 +1614,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
}
#if defined(THREADED_RTS)
- // release our stash of capabilities.
- for (i = 0; i < n_capabilities; i++) {
- if (cap != &capabilities[i]) {
- task->cap = &capabilities[i];
- releaseCapability(&capabilities[i]);
- }
+ if (gc_type == PENDING_GC_SEQ) {
+ // release our stash of capabilities.
+ for (i = 0; i < n_capabilities; i++) {
+ if (cap != &capabilities[i]) {
+ task->cap = &capabilities[i];
+ releaseCapability(&capabilities[i]);
+ }
+ }
}
if (cap) {
task->cap = cap;
@@ -2131,7 +2160,13 @@ exitScheduler(
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
- scheduleDoGC(NULL,task,rtsFalse);
+#if defined(THREADED_RTS)
+ waitForReturnCapability(&task->cap,task);
+ scheduleDoGC(task->cap,task,rtsFalse);
+ releaseCapability(task->cap);
+#else
+ scheduleDoGC(&MainCapability,task,rtsFalse);
+#endif
}
sched_state = SCHED_SHUTTING_DOWN;
@@ -2184,13 +2219,17 @@ static void
performGC_(rtsBool force_major)
{
Task *task;
+
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalling_tasks queue.
ACQUIRE_LOCK(&sched_mutex);
task = newBoundTask();
RELEASE_LOCK(&sched_mutex);
- scheduleDoGC(NULL,task,force_major);
+
+ waitForReturnCapability(&task->cap,task);
+ scheduleDoGC(task->cap,task,force_major);
+ releaseCapability(task->cap);
boundTaskExiting(task);
}
diff --git a/rts/Stats.c b/rts/Stats.c
index 228f0c021e..9c17856970 100644
--- a/rts/Stats.c
+++ b/rts/Stats.c
@@ -613,11 +613,11 @@ stat_exit(int alloc)
}
#if defined(THREADED_RTS)
- if (RtsFlags.ParFlags.gcThreads > 1) {
+ if (RtsFlags.ParFlags.parGcEnabled) {
statsPrintf("\n Parallel GC work balance: %.2f (%ld / %ld, ideal %d)\n",
(double)GC_par_avg_copied / (double)GC_par_max_copied,
(lnat)GC_par_avg_copied, (lnat)GC_par_max_copied,
- RtsFlags.ParFlags.gcThreads
+ RtsFlags.ParFlags.nNodes
);
}
#endif
diff --git a/rts/sm/GC.c b/rts/sm/GC.c
index aff33201a9..bf2464bb15 100644
--- a/rts/sm/GC.c
+++ b/rts/sm/GC.c
@@ -138,7 +138,6 @@ DECLARE_GCT
static void mark_root (void *user, StgClosure **root);
static void zero_static_object_list (StgClosure* first_static);
static nat initialise_N (rtsBool force_major_gc);
-static void alloc_gc_threads (void);
static void init_collected_gen (nat g, nat threads);
static void init_uncollected_gen (nat g, nat threads);
static void init_gc_thread (gc_thread *t);
@@ -149,8 +148,9 @@ static void start_gc_threads (void);
static void scavenge_until_all_done (void);
static nat inc_running (void);
static nat dec_running (void);
-static void wakeup_gc_threads (nat n_threads);
-static void shutdown_gc_threads (nat n_threads);
+static void wakeup_gc_threads (nat n_threads, nat me);
+static void shutdown_gc_threads (nat n_threads, nat me);
+static void continue_gc_threads (nat n_threads, nat me);
#if 0 && defined(DEBUG)
static void gcCAFs (void);
@@ -180,7 +180,9 @@ StgPtr oldgen_scan;
-------------------------------------------------------------------------- */
void
-GarbageCollect ( rtsBool force_major_gc )
+GarbageCollect (rtsBool force_major_gc,
+ nat gc_type USED_IF_THREADS,
+ Capability *cap USED_IF_THREADS)
{
bdescr *bd;
step *stp;
@@ -234,26 +236,24 @@ GarbageCollect ( rtsBool force_major_gc )
*/
n = initialise_N(force_major_gc);
- /* Allocate + initialise the gc_thread structures.
- */
- alloc_gc_threads();
-
/* Start threads, so they can be spinning up while we finish initialisation.
*/
start_gc_threads();
+#if defined(THREADED_RTS)
/* How many threads will be participating in this GC?
- * We don't try to parallelise minor GC, or mark/compact/sweep GC.
+ * We don't try to parallelise minor GCs (unless the user asks for
+ * it with +RTS -gn0), or mark/compact/sweep GC.
*/
-#if defined(THREADED_RTS)
- if (n < (4*1024*1024 / BLOCK_SIZE) || oldest_gen->steps[0].mark) {
- n_gc_threads = 1;
+ if (gc_type == PENDING_GC_PAR) {
+ n_gc_threads = RtsFlags.ParFlags.nNodes;
} else {
- n_gc_threads = RtsFlags.ParFlags.gcThreads;
+ n_gc_threads = 1;
}
#else
n_gc_threads = 1;
#endif
+
trace(TRACE_gc|DEBUG_gc, "GC (gen %d): %d KB to collect, %ld MB in use, using %d thread(s)",
N, n * (BLOCK_SIZE / 1024), mblocks_allocated, n_gc_threads);
@@ -302,7 +302,15 @@ GarbageCollect ( rtsBool force_major_gc )
}
// this is the main thread
+#ifdef THREADED_RTS
+ if (n_gc_threads == 1) {
+ gct = gc_threads[0];
+ } else {
+ gct = gc_threads[cap->no];
+ }
+#else
gct = gc_threads[0];
+#endif
/* -----------------------------------------------------------------------
* follow all the roots that we know about:
@@ -323,7 +331,7 @@ GarbageCollect ( rtsBool force_major_gc )
// NB. do this after the mutable lists have been saved above, otherwise
// the other GC threads will be writing into the old mutable lists.
inc_running();
- wakeup_gc_threads(n_gc_threads);
+ wakeup_gc_threads(n_gc_threads, gct->thread_index);
for (g = RtsFlags.GcFlags.generations-1; g > N; g--) {
scavenge_mutable_list(&generations[g]);
@@ -378,7 +386,7 @@ GarbageCollect ( rtsBool force_major_gc )
break;
}
- shutdown_gc_threads(n_gc_threads);
+ shutdown_gc_threads(n_gc_threads, gct->thread_index);
// Update pointers from the Task list
update_task_list();
@@ -756,6 +764,9 @@ GarbageCollect ( rtsBool force_major_gc )
slop = calcLiveBlocks() * BLOCK_SIZE_W - live;
stat_endGC(allocated, live, copied, N, max_copied, avg_copied, slop);
+ // Guess which generation we'll collect *next* time
+ initialise_N(force_major_gc);
+
#if defined(RTS_USER_SIGNALS)
if (RtsFlags.MiscFlags.install_signal_handlers) {
// unblock signals again
@@ -763,6 +774,8 @@ GarbageCollect ( rtsBool force_major_gc )
}
#endif
+ continue_gc_threads(n_gc_threads, gct->thread_index);
+
RELEASE_SM_LOCK;
gct = saved_gct;
@@ -814,6 +827,11 @@ initialise_N (rtsBool force_major_gc)
Initialise the gc_thread structures.
-------------------------------------------------------------------------- */
+#define GC_THREAD_INACTIVE 0
+#define GC_THREAD_STANDING_BY 1
+#define GC_THREAD_RUNNING 2
+#define GC_THREAD_WAITING_TO_CONTINUE 3
+
static gc_thread *
alloc_gc_thread (int n)
{
@@ -826,11 +844,11 @@ alloc_gc_thread (int n)
#ifdef THREADED_RTS
t->id = 0;
- initCondition(&t->wake_cond);
- initMutex(&t->wake_mutex);
- t->wakeup = rtsTrue; // starts true, so we can wait for the
+ initSpinLock(&t->gc_spin);
+ initSpinLock(&t->mut_spin);
+ ACQUIRE_SPIN_LOCK(&t->gc_spin);
+ t->wakeup = GC_THREAD_INACTIVE; // starts true, so we can wait for the
// thread to start up, see wakeup_gc_threads
- t->exit = rtsFalse;
#endif
t->thread_index = n;
@@ -864,17 +882,17 @@ alloc_gc_thread (int n)
}
-static void
-alloc_gc_threads (void)
+void
+initGcThreads (void)
{
if (gc_threads == NULL) {
#if defined(THREADED_RTS)
nat i;
- gc_threads = stgMallocBytes (RtsFlags.ParFlags.gcThreads *
+ gc_threads = stgMallocBytes (RtsFlags.ParFlags.nNodes *
sizeof(gc_thread*),
"alloc_gc_threads");
- for (i = 0; i < RtsFlags.ParFlags.gcThreads; i++) {
+ for (i = 0; i < RtsFlags.ParFlags.nNodes; i++) {
gc_threads[i] = alloc_gc_thread(i);
}
#else
@@ -992,113 +1010,107 @@ loop:
}
#if defined(THREADED_RTS)
-//
-// gc_thread_work(): Scavenge until there's no work left to do and all
-// the running threads are idle.
-//
-static void
-gc_thread_work (void)
+
+void
+gcWorkerThread (Capability *cap)
{
- // gc_running_threads has already been incremented for us; this is
- // a worker thread and the main thread bumped gc_running_threads
- // before waking us up.
+ cap->in_gc = rtsTrue;
+
+ gct = gc_threads[cap->no];
+ gct->id = osThreadId();
+ // Wait until we're told to wake up
+ RELEASE_SPIN_LOCK(&gct->mut_spin);
+ gct->wakeup = GC_THREAD_STANDING_BY;
+ debugTrace(DEBUG_gc, "GC thread %d standing by...", gct->thread_index);
+ ACQUIRE_SPIN_LOCK(&gct->gc_spin);
+
+#ifdef USE_PAPI
+ // start performance counters in this thread...
+ if (gct->papi_events == -1) {
+ papi_init_eventset(&gct->papi_events);
+ }
+ papi_thread_start_gc1_count(gct->papi_events);
+#endif
+
// Every thread evacuates some roots.
gct->evac_step = 0;
markSomeCapabilities(mark_root, gct, gct->thread_index, n_gc_threads,
rtsTrue/*prune sparks*/);
scavenge_until_all_done();
-}
-
-
-static void
-gc_thread_mainloop (void)
-{
- while (!gct->exit) {
-
- // Wait until we're told to wake up
- ACQUIRE_LOCK(&gct->wake_mutex);
- gct->wakeup = rtsFalse;
- while (!gct->wakeup) {
- debugTrace(DEBUG_gc, "GC thread %d standing by...",
- gct->thread_index);
- waitCondition(&gct->wake_cond, &gct->wake_mutex);
- }
- RELEASE_LOCK(&gct->wake_mutex);
- if (gct->exit) break;
-
+
#ifdef USE_PAPI
- // start performance counters in this thread...
- if (gct->papi_events == -1) {
- papi_init_eventset(&gct->papi_events);
- }
- papi_thread_start_gc1_count(gct->papi_events);
+ // count events in this thread towards the GC totals
+ papi_thread_stop_gc1_count(gct->papi_events);
#endif
- gc_thread_work();
+ // Wait until we're told to continue
+ RELEASE_SPIN_LOCK(&gct->gc_spin);
+ gct->wakeup = GC_THREAD_WAITING_TO_CONTINUE;
+ debugTrace(DEBUG_gc, "GC thread %d waiting to continue...",
+ gct->thread_index);
+ ACQUIRE_SPIN_LOCK(&gct->mut_spin);
+ debugTrace(DEBUG_gc, "GC thread %d on my way...", gct->thread_index);
+}
-#ifdef USE_PAPI
- // count events in this thread towards the GC totals
- papi_thread_stop_gc1_count(gct->papi_events);
-#endif
- }
-}
#endif
-#if defined(THREADED_RTS)
-static void
-gc_thread_entry (gc_thread *my_gct)
+void
+waitForGcThreads (Capability *cap USED_IF_THREADS)
{
- gct = my_gct;
- debugTrace(DEBUG_gc, "GC thread %d starting...", gct->thread_index);
- gct->id = osThreadId();
- gc_thread_mainloop();
-}
+#if defined(THREADED_RTS)
+ nat n_threads = RtsFlags.ParFlags.nNodes;
+ nat me = cap->no;
+ nat i, j;
+ rtsBool retry = rtsTrue;
+
+ while(retry) {
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
+ if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+ prodCapability(&capabilities[i], cap->running_task);
+ }
+ }
+ for (j=0; j < 10000000; j++) {
+ retry = rtsFalse;
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
+ write_barrier();
+ setContextSwitches();
+ if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+ retry = rtsTrue;
+ }
+ }
+ if (!retry) break;
+ }
+ }
#endif
+}
static void
start_gc_threads (void)
{
#if defined(THREADED_RTS)
- nat i;
- OSThreadId id;
- static rtsBool done = rtsFalse;
-
gc_running_threads = 0;
initMutex(&gc_running_mutex);
-
- if (!done) {
- // Start from 1: the main thread is 0
- for (i = 1; i < RtsFlags.ParFlags.gcThreads; i++) {
- createOSThread(&id, (OSThreadProc*)&gc_thread_entry,
- gc_threads[i]);
- }
- done = rtsTrue;
- }
#endif
}
static void
-wakeup_gc_threads (nat n_threads USED_IF_THREADS)
+wakeup_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
{
#if defined(THREADED_RTS)
nat i;
- for (i=1; i < n_threads; i++) {
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
inc_running();
debugTrace(DEBUG_gc, "waking up gc thread %d", i);
- do {
- ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
- if (gc_threads[i]->wakeup) {
- RELEASE_LOCK(&gc_threads[i]->wake_mutex);
- continue;
- } else {
- break;
- }
- } while (1);
- gc_threads[i]->wakeup = rtsTrue;
- signalCondition(&gc_threads[i]->wake_cond);
- RELEASE_LOCK(&gc_threads[i]->wake_mutex);
+ if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) barf("wakeup_gc_threads");
+
+ gc_threads[i]->wakeup = GC_THREAD_RUNNING;
+ ACQUIRE_SPIN_LOCK(&gc_threads[i]->mut_spin);
+ RELEASE_SPIN_LOCK(&gc_threads[i]->gc_spin);
}
#endif
}
@@ -1107,18 +1119,29 @@ wakeup_gc_threads (nat n_threads USED_IF_THREADS)
// standby state, otherwise they may still be executing inside
// any_work(), and may even remain awake until the next GC starts.
static void
-shutdown_gc_threads (nat n_threads USED_IF_THREADS)
+shutdown_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
{
#if defined(THREADED_RTS)
nat i;
- rtsBool wakeup;
- for (i=1; i < n_threads; i++) {
- do {
- ACQUIRE_LOCK(&gc_threads[i]->wake_mutex);
- wakeup = gc_threads[i]->wakeup;
- // wakeup is false while the thread is waiting
- RELEASE_LOCK(&gc_threads[i]->wake_mutex);
- } while (wakeup);
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
+ while (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) { write_barrier(); }
+ }
+#endif
+}
+
+static void
+continue_gc_threads (nat n_threads USED_IF_THREADS, nat me USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ nat i;
+ for (i=0; i < n_threads; i++) {
+ if (i == me) continue;
+ if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) barf("continue_gc_threads");
+
+ gc_threads[i]->wakeup = GC_THREAD_INACTIVE;
+ ACQUIRE_SPIN_LOCK(&gc_threads[i]->gc_spin);
+ RELEASE_SPIN_LOCK(&gc_threads[i]->mut_spin);
}
#endif
}
diff --git a/rts/sm/GC.h b/rts/sm/GC.h
index 6331320a30..5fb142f58f 100644
--- a/rts/sm/GC.h
+++ b/rts/sm/GC.h
@@ -40,6 +40,10 @@ extern SpinLock gc_alloc_block_sync;
extern StgWord64 whitehole_spin;
#endif
+void gcWorkerThread (Capability *cap);
+void initGcThreads (void);
+void waitForGcThreads (Capability *cap);
+
#define WORK_UNIT_WORDS 128
#endif /* GC_H */
diff --git a/rts/sm/GCThread.h b/rts/sm/GCThread.h
index 1b5c5d4291..d6af2b1571 100644
--- a/rts/sm/GCThread.h
+++ b/rts/sm/GCThread.h
@@ -113,10 +113,9 @@ typedef struct step_workspace_ {
typedef struct gc_thread_ {
#ifdef THREADED_RTS
OSThreadId id; // The OS thread that this struct belongs to
- Mutex wake_mutex;
- Condition wake_cond; // So we can go to sleep between GCs
- rtsBool wakeup;
- rtsBool exit;
+ SpinLock gc_spin;
+ SpinLock mut_spin;
+ volatile rtsBool wakeup;
#endif
nat thread_index; // a zero based index identifying the thread
diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c
index 6c45cbed59..bf7c452d9b 100644
--- a/rts/sm/Storage.c
+++ b/rts/sm/Storage.c
@@ -276,6 +276,10 @@ initStorage( void )
whitehole_spin = 0;
#endif
+ N = 0;
+
+ initGcThreads();
+
IF_DEBUG(gc, statDescribeGens());
RELEASE_SM_LOCK;