diff options
author | Simon Marlow <marlowsd@gmail.com> | 2016-04-09 20:45:50 +0100 |
---|---|---|
committer | Simon Marlow <smarlow@fb.com> | 2016-05-04 05:30:30 -0700 |
commit | 76ee260778991367b8dbf07ecf7afd31f826c824 (patch) | |
tree | fbddddf878413dab3c01abf8108c26d2bd20db4c /rts | |
parent | f9d93751126e58fb990335095e02fd81a3595fde (diff) | |
download | haskell-76ee260778991367b8dbf07ecf7afd31f826c824.tar.gz |
Allow limiting the number of GC threads (+RTS -qn<n>)
This allows the GC to use fewer threads than the number of capabilities.
At each GC, we choose some of the capabilities to be "idle", which means
that the thread running on that capability (if any) will sleep for the
duration of the GC, and the other threads will do its work. We choose
capabilities that are already idle (if any) to be the idle capabilities.
The idea is that this helps in the following situation:
* We want to use a large -N value so as to make use of hyperthreaded
cores
* We use a large heap size, so GC is infrequent
* But we don't want to use all -N threads in the GC, because that
thrashes the memory too much.
See docs for usage.
Diffstat (limited to 'rts')
-rw-r--r-- | rts/Capability.c | 48 | ||||
-rw-r--r-- | rts/Capability.h | 23 | ||||
-rw-r--r-- | rts/RtsFlags.c | 20 | ||||
-rw-r--r-- | rts/Schedule.c | 277 | ||||
-rw-r--r-- | rts/Task.h | 20 | ||||
-rw-r--r-- | rts/sm/GC.c | 4 |
6 files changed, 271 insertions, 121 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index 355f36d0c5..aa77d1b357 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -55,10 +55,9 @@ static Capability *last_free_capability = NULL; /* * Indicates that the RTS wants to synchronise all the Capabilities - * for some reason. All Capabilities should stop and return to the - * scheduler. + * for some reason. All Capabilities should yieldCapability(). */ -volatile StgWord pending_sync = 0; +PendingSync * volatile pending_sync = 0; /* Let foreign code get the current Capability -- assuming there is one! * This is useful for unsafe foreign calls because they are called with @@ -477,13 +476,19 @@ releaseCapability_ (Capability* cap, return; } - // If there is a pending sync, then we should just leave the - // Capability free. The thread trying to sync will be about to - // call waitForCapability(). - if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) { - last_free_capability = cap; // needed? - debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no); - return; + // If there is a pending sync, then we should just leave the Capability + // free. The thread trying to sync will be about to call + // waitForCapability(). + // + // Note: this is *after* we check for a returning task above, + // because the task attempting to acquire all the capabilities may + // be currently in waitForCapability() waiting for this + // capability, in which case simply setting it as free would not + // wake up the waiting task. + PendingSync *sync = pending_sync; + if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) { + debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no); + return; } // If the next thread on the run queue is a bound thread, @@ -795,14 +800,21 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed) { Capability *cap = *pCap; - if ((pending_sync == SYNC_GC_PAR) && gcAllowed) { - traceEventGcStart(cap); - gcWorkerThread(cap); - traceEventGcEnd(cap); - traceSparkCounters(cap); - // See Note [migrated bound threads 2] - if (task->cap == cap) { - return rtsTrue; + if (gcAllowed) + { + PendingSync *sync = pending_sync; + + if (sync && sync->type == SYNC_GC_PAR) { + if (! sync->idle[cap->no]) { + traceEventGcStart(cap); + gcWorkerThread(cap); + traceEventGcEnd(cap); + traceSparkCounters(cap); + // See Note [migrated bound threads 2] + if (task->cap == cap) { + return rtsTrue; + } + } } } diff --git a/rts/Capability.h b/rts/Capability.h index 561d369a21..85fb53457d 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -225,14 +225,29 @@ INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED, extern Capability **capabilities; // +// Types of global synchronisation +// +typedef enum { + SYNC_OTHER, + SYNC_GC_SEQ, + SYNC_GC_PAR +} SyncType; + +// +// Details about a global synchronisation +// +typedef struct { + SyncType type; // The kind of synchronisation + rtsBool *idle; + Task *task; // The Task performing the sync +} PendingSync; + +// // Indicates that the RTS wants to synchronise all the Capabilities // for some reason. All Capabilities should stop and return to the // scheduler. // -#define SYNC_GC_SEQ 1 -#define SYNC_GC_PAR 2 -#define SYNC_OTHER 3 -extern volatile StgWord pending_sync; +extern PendingSync * volatile pending_sync; // Acquires a capability at a return point. If *cap is non-NULL, then // this is taken as a preference for the Capability we wish to diff --git a/rts/RtsFlags.c b/rts/RtsFlags.c index bffb1287e5..73c5b4564a 100644 --- a/rts/RtsFlags.c +++ b/rts/RtsFlags.c @@ -226,6 +226,7 @@ void initRtsFlagsDefaults(void) RtsFlags.ParFlags.parGcLoadBalancingEnabled = rtsTrue; RtsFlags.ParFlags.parGcLoadBalancingGen = 1; RtsFlags.ParFlags.parGcNoSyncWithIdle = 0; + RtsFlags.ParFlags.parGcThreads = 0; /* defaults to -N */ RtsFlags.ParFlags.setAffinity = 0; #endif @@ -388,6 +389,7 @@ usage_text[] = { " (default: 0, -qg alone turns off parallel GC)", " -qb[<n>] Use load-balancing in the parallel GC only for generations >= <n>", " (default: 1, -qb alone turns off load-balancing)", +" -qn<n> Use <n> threads for parallel GC (defaults to value of -N)", " -qa Use the OS to set thread affinity (experimental)", " -qm Don't automatically migrate threads between CPUs", " -qi<n> If a processor has been idle for the last <n> GCs, do not", @@ -1130,6 +1132,17 @@ error = rtsTrue; RtsFlags.ParFlags.parGcNoSyncWithIdle = strtol(rts_argv[arg]+3, (char **) NULL, 10); break; + case 'n': { + int threads; + threads = strtol(rts_argv[arg]+3, (char **) NULL, 10); + if (threads <= 0) { + errorBelch("-qn must be 1 or greater"); + error = rtsTrue; + } else { + RtsFlags.ParFlags.parGcThreads = threads; + } + break; + } case 'a': RtsFlags.ParFlags.setAffinity = rtsTrue; break; @@ -1370,6 +1383,13 @@ static void normaliseRtsOpts (void) "of the stack chunk size (-kc)"); errorUsage(); } + +#ifdef THREADED_RTS + if (RtsFlags.ParFlags.parGcThreads > RtsFlags.ParFlags.nNodes) { + errorBelch("GC threads (-qn) must be between 1 and the value of -N"); + errorUsage(); + } +#endif } static void errorUsage (void) diff --git a/rts/Schedule.c b/rts/Schedule.c index 31ee99a3fa..abf3be58af 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -103,11 +103,6 @@ Mutex sched_mutex; #define FORKPROCESS_PRIMOP_SUPPORTED #endif -// Local stats -#ifdef THREADED_RTS -static nat n_failed_trygrab_idles = 0, n_idle_caps = 0; -#endif - /* ----------------------------------------------------------------------------- * static function prototypes * -------------------------------------------------------------------------- */ @@ -125,9 +120,11 @@ static void scheduleFindWork (Capability **pcap); static void scheduleYield (Capability **pcap, Task *task); #endif #if defined(THREADED_RTS) -static nat requestSync (Capability **pcap, Task *task, nat sync_type); +static rtsBool requestSync (Capability **pcap, Task *task, + PendingSync *sync_type, SyncType *prev_sync_type); static void acquireAllCapabilities(Capability *cap, Task *task); static void releaseAllCapabilities(nat n, Capability *cap, Task *task); +static void stopAllCapabilities(Capability **pCap, Task *task); static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS); #endif static void scheduleStartSignalHandlers (Capability *cap); @@ -827,6 +824,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // release the capabilities for (i = 0; i < n_free_caps; i++) { task->cap = free_caps[i]; + // The idea behind waking up the capability unconditionally is that + // it might be able to steal sparks. Perhaps we should only do this + // if there were sparks to steal? releaseAndWakeupCapability(free_caps[i]); } } @@ -1351,53 +1351,102 @@ scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED ) } /* ----------------------------------------------------------------------------- - * Start a synchronisation of all capabilities + * stopAllCapabilities() + * + * Stop all Haskell execution. This is used when we need to make some global + * change to the system, such as altering the number of capabilities, or + * forking. + * + * To resume after stopAllCapabilities(), use releaseAllCapabilities(). + * -------------------------------------------------------------------------- */ + +#if defined(THREADED_RTS) +static void stopAllCapabilities (Capability **pCap, Task *task) +{ + rtsBool was_syncing; + SyncType prev_sync_type; + + PendingSync sync = { + .type = SYNC_OTHER, + .idle = NULL, + .task = task + }; + + do { + was_syncing = requestSync(pCap, task, &sync, &prev_sync_type); + } while (was_syncing); + + acquireAllCapabilities(*pCap,task); + + pending_sync = 0; +} +#endif + +/* ----------------------------------------------------------------------------- + * requestSync() + * + * Commence a synchronisation between all capabilities. Normally not called + * directly, instead use stopAllCapabilities(). This is used by the GC, which + * has some special synchronisation requirements. + * + * Returns: + * rtsFalse if we successfully got a sync + * rtsTrue if there was another sync request in progress, + * and we yielded to it. The value returned is the + * type of the other sync request. * -------------------------------------------------------------------------- */ -// Returns: -// 0 if we successfully got a sync -// non-0 if there was another sync request in progress, -// and we yielded to it. The value returned is the -// type of the other sync request. -// #if defined(THREADED_RTS) -static nat requestSync (Capability **pcap, Task *task, nat sync_type) +static rtsBool requestSync ( + Capability **pcap, Task *task, PendingSync *sync, + SyncType *prev_sync_type) { - nat prev_pending_sync; + PendingSync *prev_sync; - prev_pending_sync = cas(&pending_sync, 0, sync_type); + prev_sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync, + (StgWord)NULL, + (StgWord)sync); - if (prev_pending_sync) + if (prev_sync) { do { debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...", - prev_pending_sync); + prev_sync->type); ASSERT(*pcap); yieldCapability(pcap,task,rtsTrue); } while (pending_sync); - return prev_pending_sync; // NOTE: task->cap might have changed now + // NOTE: task->cap might have changed now + *prev_sync_type = prev_sync->type; + return rtsTrue; } else { - return 0; + return rtsFalse; } } +#endif -// -// Grab all the capabilities except the one we already hold. Used -// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and -// before a fork (SYNC_OTHER). -// -// Only call this after requestSync(), otherwise a deadlock might -// ensue if another thread is trying to synchronise. -// +/* ----------------------------------------------------------------------------- + * acquireAllCapabilities() + * + * Grab all the capabilities except the one we already hold. Used + * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and + * before a fork (SYNC_OTHER). + * + * Only call this after requestSync(), otherwise a deadlock might + * ensue if another thread is trying to synchronise. + * -------------------------------------------------------------------------- */ + +#ifdef THREADED_RTS static void acquireAllCapabilities(Capability *cap, Task *task) { Capability *tmpcap; nat i; + ASSERT(pending_sync != NULL); for (i=0; i < n_capabilities; i++) { - debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities); + debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", + i, n_capabilities); tmpcap = capabilities[i]; if (tmpcap != cap) { // we better hope this task doesn't get migrated to @@ -1414,7 +1463,16 @@ static void acquireAllCapabilities(Capability *cap, Task *task) } task->cap = cap; } +#endif + +/* ----------------------------------------------------------------------------- + * releaseAllcapabilities() + * + * Assuming this thread holds all the capabilities, release them all except for + * the one passed in as cap. + * -------------------------------------------------------------------------- */ +#ifdef THREADED_RTS static void releaseAllCapabilities(nat n, Capability *cap, Task *task) { nat i; @@ -1443,8 +1501,11 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, rtsBool major_gc; #ifdef THREADED_RTS nat gc_type; - nat i, sync; + nat i; + nat need_idle; + nat n_idle_caps = 0, n_failed_trygrab_idles = 0; StgTSO *tso; + rtsBool *idle_cap; #endif if (sched_state == SCHED_SHUTTING_DOWN) { @@ -1472,6 +1533,13 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, gc_type = SYNC_GC_SEQ; } + if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) { + need_idle = stg_max(0, enabled_capabilities - + RtsFlags.ParFlags.parGcThreads); + } else { + need_idle = 0; + } + // In order to GC, there must be no threads running Haskell code. // Therefore, the GC thread needs to hold *all* the capabilities, // and release them after the GC has completed. @@ -1487,26 +1555,70 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, threads if pending_sync is set. Tested inside yieldCapability() and releaseCapability() in Capability.c */ - do { - sync = requestSync(pcap, task, gc_type); - cap = *pcap; - if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) { - // someone else had a pending sync request for a GC, so - // let's assume GC has been done and we don't need to GC - // again. - return; - } - if (sched_state == SCHED_SHUTTING_DOWN) { - // The scheduler might now be shutting down. We tested - // this above, but it might have become true since then as - // we yielded the capability in requestSync(). - return; - } - } while (sync); + PendingSync sync = { + .type = gc_type, + .idle = NULL, + .task = task + }; + + { + SyncType prev_sync = 0; + rtsBool was_syncing; + do { + // We need an array of size n_capabilities, but since this may + // change each time around the loop we must allocate it afresh. + idle_cap = (rtsBool *)stgMallocBytes(n_capabilities * + sizeof(rtsBool), + "scheduleDoGC"); + sync.idle = idle_cap; + + // When using +RTS -qn, we need some capabilities to be idle during + // GC. The best bet is to choose some inactive ones, so we look for + // those first: + nat n_idle = need_idle; + for (i=0; i < n_capabilities; i++) { + if (capabilities[i]->disabled) { + idle_cap[i] = rtsTrue; + } else if (n_idle > 0 && + capabilities[i]->running_task == NULL) { + debugTrace(DEBUG_sched, "asking for cap %d to be idle", i); + n_idle--; + idle_cap[i] = rtsTrue; + } else { + idle_cap[i] = rtsFalse; + } + } + // If we didn't find enough inactive capabilities, just pick some + // more to be idle. + for (i=0; n_idle > 0 && i < n_capabilities; i++) { + if (!idle_cap[i] && i != cap->no) { + idle_cap[i] = rtsTrue; + n_idle--; + } + } + ASSERT(n_idle == 0); + + was_syncing = requestSync(pcap, task, &sync, &prev_sync); + cap = *pcap; + if (was_syncing) { + stgFree(idle_cap); + } + if (was_syncing && (prev_sync == SYNC_GC_SEQ || + prev_sync == SYNC_GC_PAR)) { + // someone else had a pending sync request for a GC, so + // let's assume GC has been done and we don't need to GC + // again. + return; + } + if (sched_state == SCHED_SHUTTING_DOWN) { + // The scheduler might now be shutting down. We tested + // this above, but it might have become true since then as + // we yielded the capability in requestSync(). + return; + } + } while (was_syncing); + } - // don't declare this until after we have sync'd, because - // n_capabilities may change. - rtsBool idle_cap[n_capabilities]; #ifdef DEBUG unsigned int old_n_capabilities = n_capabilities; #endif @@ -1516,18 +1628,13 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, // The final shutdown GC is always single-threaded, because it's // possible that some of the Capabilities have no worker threads. - if (gc_type == SYNC_GC_SEQ) - { + if (gc_type == SYNC_GC_SEQ) { traceEventRequestSeqGc(cap); - } - else - { + } else { traceEventRequestParGc(cap); - debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); } - if (gc_type == SYNC_GC_SEQ) - { + if (gc_type == SYNC_GC_SEQ) { // single-threaded GC: grab all the capabilities acquireAllCapabilities(cap,task); } @@ -1543,31 +1650,45 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0 || (RtsFlags.ParFlags.parGcLoadBalancingEnabled && - collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) { + collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) + { for (i=0; i < n_capabilities; i++) { if (capabilities[i]->disabled) { idle_cap[i] = tryGrabCapability(capabilities[i], task); + if (idle_cap[i]) { + n_idle_caps++; + } } else { - idle_cap[i] = rtsFalse; + if (i != cap->no && idle_cap[i]) { + Capability *tmpcap = capabilities[i]; + task->cap = tmpcap; + waitForCapability(&tmpcap, task); + n_idle_caps++; + } } } - } else { + } + else + { for (i=0; i < n_capabilities; i++) { if (capabilities[i]->disabled) { idle_cap[i] = tryGrabCapability(capabilities[i], task); - } else if (i == cap->no || - capabilities[i]->idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) { - idle_cap[i] = rtsFalse; - } else { + if (idle_cap[i]) { + n_idle_caps++; + } + } else if (i != cap->no && + capabilities[i]->idle >= + RtsFlags.ParFlags.parGcNoSyncWithIdle) { idle_cap[i] = tryGrabCapability(capabilities[i], task); - if (!idle_cap[i]) { - n_failed_trygrab_idles++; - } else { + if (idle_cap[i]) { n_idle_caps++; + } else { + n_failed_trygrab_idles++; } } } } + debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps); // We set the gc_thread[i]->idle flag if that // capability/thread is not participating in this collection. @@ -1719,6 +1840,8 @@ delete_threads_and_gc: } task->cap = cap; } + + stgFree(idle_cap); #endif if (heap_overflow && sched_state < SCHED_INTERRUPTING) { @@ -1774,9 +1897,6 @@ forkProcess(HsStablePtr *entry nat g; Task *task = NULL; nat i; -#ifdef THREADED_RTS - nat sync; -#endif debugTrace(DEBUG_sched, "forking!"); @@ -1786,13 +1906,7 @@ forkProcess(HsStablePtr *entry waitForCapability(&cap, task); #ifdef THREADED_RTS - do { - sync = requestSync(&cap, task, SYNC_OTHER); - } while (sync); - - acquireAllCapabilities(cap,task); - - pending_sync = 0; + stopAllCapabilities(&cap, task); #endif // no funny business: hold locks while we fork, otherwise if some @@ -1980,7 +2094,6 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS) #else Task *task; Capability *cap; - nat sync; nat n; Capability *old_capabilities = NULL; nat old_n_capabilities = n_capabilities; @@ -1993,13 +2106,7 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS) cap = rts_lock(); task = cap->running_task; - do { - sync = requestSync(&cap, task, SYNC_OTHER); - } while (sync); - - acquireAllCapabilities(cap,task); - - pending_sync = 0; + stopAllCapabilities(&cap, task); if (new_n_capabilities < enabled_capabilities) { diff --git a/rts/Task.h b/rts/Task.h index bcf456d270..209df49ddb 100644 --- a/rts/Task.h +++ b/rts/Task.h @@ -125,18 +125,16 @@ typedef struct Task_ { rtsBool wakeup; #endif - // This points to the Capability that the Task "belongs" to. If - // the Task owns a Capability, then task->cap points to it. If - // the task does not own a Capability, then either (a) if the task - // is a worker, then task->cap points to the Capability it belongs - // to, or (b) it is returning from a foreign call, then task->cap - // points to the Capability with the returning_worker queue that this - // this Task is on. + // If the task owns a Capability, task->cap points to it. (occasionally a + // task may own multiple capabilities, in which case task->cap may point to + // any of them. We must be careful to set task->cap to the appropriate one + // when using Capability APIs.) // - // When a task goes to sleep, it may be migrated to a different - // Capability. Hence, we always check task->cap on wakeup. To - // syncrhonise between the migrater and the migratee, task->lock - // must be held when modifying task->cap. + // If the task is a worker, task->cap points to the Capability on which it + // is queued. + // + // If the task is in an unsafe foreign call, then task->cap can be used to + // retrieve the capability (see rts_unsafeGetMyCapability()). struct Capability_ *cap; // The current top-of-stack InCall diff --git a/rts/sm/GC.c b/rts/sm/GC.c index df73ab8314..d3f3ab0166 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -619,8 +619,7 @@ GarbageCollect (nat collect_gen, live_words += genLiveWords(gen); live_blocks += genLiveBlocks(gen); - // add in the partial blocks in the gen_workspaces, but ignore gen 0 - // if this is a local GC (we can't count another capability's part_list) + // add in the partial blocks in the gen_workspaces { nat i; for (i = 0; i < n_capabilities; i++) { @@ -1071,7 +1070,6 @@ waitForGcThreads (Capability *cap USED_IF_THREADS) stat_startGCSync(gc_threads[cap->no]); - while(retry) { for (i=0; i < n_threads; i++) { if (i == me || gc_threads[i]->idle) continue; |