summaryrefslogtreecommitdiff
path: root/rts/Schedule.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2016-04-09 20:45:50 +0100
committerSimon Marlow <smarlow@fb.com>2016-05-04 05:30:30 -0700
commit76ee260778991367b8dbf07ecf7afd31f826c824 (patch)
treefbddddf878413dab3c01abf8108c26d2bd20db4c /rts/Schedule.c
parentf9d93751126e58fb990335095e02fd81a3595fde (diff)
downloadhaskell-76ee260778991367b8dbf07ecf7afd31f826c824.tar.gz
Allow limiting the number of GC threads (+RTS -qn<n>)
This allows the GC to use fewer threads than the number of capabilities. At each GC, we choose some of the capabilities to be "idle", which means that the thread running on that capability (if any) will sleep for the duration of the GC, and the other threads will do its work. We choose capabilities that are already idle (if any) to be the idle capabilities. The idea is that this helps in the following situation: * We want to use a large -N value so as to make use of hyperthreaded cores * We use a large heap size, so GC is infrequent * But we don't want to use all -N threads in the GC, because that thrashes the memory too much. See docs for usage.
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r--rts/Schedule.c277
1 files changed, 192 insertions, 85 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 31ee99a3fa..abf3be58af 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -103,11 +103,6 @@ Mutex sched_mutex;
#define FORKPROCESS_PRIMOP_SUPPORTED
#endif
-// Local stats
-#ifdef THREADED_RTS
-static nat n_failed_trygrab_idles = 0, n_idle_caps = 0;
-#endif
-
/* -----------------------------------------------------------------------------
* static function prototypes
* -------------------------------------------------------------------------- */
@@ -125,9 +120,11 @@ static void scheduleFindWork (Capability **pcap);
static void scheduleYield (Capability **pcap, Task *task);
#endif
#if defined(THREADED_RTS)
-static nat requestSync (Capability **pcap, Task *task, nat sync_type);
+static rtsBool requestSync (Capability **pcap, Task *task,
+ PendingSync *sync_type, SyncType *prev_sync_type);
static void acquireAllCapabilities(Capability *cap, Task *task);
static void releaseAllCapabilities(nat n, Capability *cap, Task *task);
+static void stopAllCapabilities(Capability **pCap, Task *task);
static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
@@ -827,6 +824,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
task->cap = free_caps[i];
+ // The idea behind waking up the capability unconditionally is that
+ // it might be able to steal sparks. Perhaps we should only do this
+ // if there were sparks to steal?
releaseAndWakeupCapability(free_caps[i]);
}
}
@@ -1351,53 +1351,102 @@ scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
}
/* -----------------------------------------------------------------------------
- * Start a synchronisation of all capabilities
+ * stopAllCapabilities()
+ *
+ * Stop all Haskell execution. This is used when we need to make some global
+ * change to the system, such as altering the number of capabilities, or
+ * forking.
+ *
+ * To resume after stopAllCapabilities(), use releaseAllCapabilities().
+ * -------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+static void stopAllCapabilities (Capability **pCap, Task *task)
+{
+ rtsBool was_syncing;
+ SyncType prev_sync_type;
+
+ PendingSync sync = {
+ .type = SYNC_OTHER,
+ .idle = NULL,
+ .task = task
+ };
+
+ do {
+ was_syncing = requestSync(pCap, task, &sync, &prev_sync_type);
+ } while (was_syncing);
+
+ acquireAllCapabilities(*pCap,task);
+
+ pending_sync = 0;
+}
+#endif
+
+/* -----------------------------------------------------------------------------
+ * requestSync()
+ *
+ * Commence a synchronisation between all capabilities. Normally not called
+ * directly, instead use stopAllCapabilities(). This is used by the GC, which
+ * has some special synchronisation requirements.
+ *
+ * Returns:
+ * rtsFalse if we successfully got a sync
+ * rtsTrue if there was another sync request in progress,
+ * and we yielded to it. The value returned is the
+ * type of the other sync request.
* -------------------------------------------------------------------------- */
-// Returns:
-// 0 if we successfully got a sync
-// non-0 if there was another sync request in progress,
-// and we yielded to it. The value returned is the
-// type of the other sync request.
-//
#if defined(THREADED_RTS)
-static nat requestSync (Capability **pcap, Task *task, nat sync_type)
+static rtsBool requestSync (
+ Capability **pcap, Task *task, PendingSync *sync,
+ SyncType *prev_sync_type)
{
- nat prev_pending_sync;
+ PendingSync *prev_sync;
- prev_pending_sync = cas(&pending_sync, 0, sync_type);
+ prev_sync = (PendingSync*)cas((StgVolatilePtr)&pending_sync,
+ (StgWord)NULL,
+ (StgWord)sync);
- if (prev_pending_sync)
+ if (prev_sync)
{
do {
debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
- prev_pending_sync);
+ prev_sync->type);
ASSERT(*pcap);
yieldCapability(pcap,task,rtsTrue);
} while (pending_sync);
- return prev_pending_sync; // NOTE: task->cap might have changed now
+ // NOTE: task->cap might have changed now
+ *prev_sync_type = prev_sync->type;
+ return rtsTrue;
}
else
{
- return 0;
+ return rtsFalse;
}
}
+#endif
-//
-// Grab all the capabilities except the one we already hold. Used
-// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
-// before a fork (SYNC_OTHER).
-//
-// Only call this after requestSync(), otherwise a deadlock might
-// ensue if another thread is trying to synchronise.
-//
+/* -----------------------------------------------------------------------------
+ * acquireAllCapabilities()
+ *
+ * Grab all the capabilities except the one we already hold. Used
+ * when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
+ * before a fork (SYNC_OTHER).
+ *
+ * Only call this after requestSync(), otherwise a deadlock might
+ * ensue if another thread is trying to synchronise.
+ * -------------------------------------------------------------------------- */
+
+#ifdef THREADED_RTS
static void acquireAllCapabilities(Capability *cap, Task *task)
{
Capability *tmpcap;
nat i;
+ ASSERT(pending_sync != NULL);
for (i=0; i < n_capabilities; i++) {
- debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
+ debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
+ i, n_capabilities);
tmpcap = capabilities[i];
if (tmpcap != cap) {
// we better hope this task doesn't get migrated to
@@ -1414,7 +1463,16 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
}
task->cap = cap;
}
+#endif
+
+/* -----------------------------------------------------------------------------
+ * releaseAllcapabilities()
+ *
+ * Assuming this thread holds all the capabilities, release them all except for
+ * the one passed in as cap.
+ * -------------------------------------------------------------------------- */
+#ifdef THREADED_RTS
static void releaseAllCapabilities(nat n, Capability *cap, Task *task)
{
nat i;
@@ -1443,8 +1501,11 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
rtsBool major_gc;
#ifdef THREADED_RTS
nat gc_type;
- nat i, sync;
+ nat i;
+ nat need_idle;
+ nat n_idle_caps = 0, n_failed_trygrab_idles = 0;
StgTSO *tso;
+ rtsBool *idle_cap;
#endif
if (sched_state == SCHED_SHUTTING_DOWN) {
@@ -1472,6 +1533,13 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
gc_type = SYNC_GC_SEQ;
}
+ if (gc_type == SYNC_GC_PAR && RtsFlags.ParFlags.parGcThreads > 0) {
+ need_idle = stg_max(0, enabled_capabilities -
+ RtsFlags.ParFlags.parGcThreads);
+ } else {
+ need_idle = 0;
+ }
+
// In order to GC, there must be no threads running Haskell code.
// Therefore, the GC thread needs to hold *all* the capabilities,
// and release them after the GC has completed.
@@ -1487,26 +1555,70 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
threads if pending_sync is set. Tested inside
yieldCapability() and releaseCapability() in Capability.c */
- do {
- sync = requestSync(pcap, task, gc_type);
- cap = *pcap;
- if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
- // someone else had a pending sync request for a GC, so
- // let's assume GC has been done and we don't need to GC
- // again.
- return;
- }
- if (sched_state == SCHED_SHUTTING_DOWN) {
- // The scheduler might now be shutting down. We tested
- // this above, but it might have become true since then as
- // we yielded the capability in requestSync().
- return;
- }
- } while (sync);
+ PendingSync sync = {
+ .type = gc_type,
+ .idle = NULL,
+ .task = task
+ };
+
+ {
+ SyncType prev_sync = 0;
+ rtsBool was_syncing;
+ do {
+ // We need an array of size n_capabilities, but since this may
+ // change each time around the loop we must allocate it afresh.
+ idle_cap = (rtsBool *)stgMallocBytes(n_capabilities *
+ sizeof(rtsBool),
+ "scheduleDoGC");
+ sync.idle = idle_cap;
+
+ // When using +RTS -qn, we need some capabilities to be idle during
+ // GC. The best bet is to choose some inactive ones, so we look for
+ // those first:
+ nat n_idle = need_idle;
+ for (i=0; i < n_capabilities; i++) {
+ if (capabilities[i]->disabled) {
+ idle_cap[i] = rtsTrue;
+ } else if (n_idle > 0 &&
+ capabilities[i]->running_task == NULL) {
+ debugTrace(DEBUG_sched, "asking for cap %d to be idle", i);
+ n_idle--;
+ idle_cap[i] = rtsTrue;
+ } else {
+ idle_cap[i] = rtsFalse;
+ }
+ }
+ // If we didn't find enough inactive capabilities, just pick some
+ // more to be idle.
+ for (i=0; n_idle > 0 && i < n_capabilities; i++) {
+ if (!idle_cap[i] && i != cap->no) {
+ idle_cap[i] = rtsTrue;
+ n_idle--;
+ }
+ }
+ ASSERT(n_idle == 0);
+
+ was_syncing = requestSync(pcap, task, &sync, &prev_sync);
+ cap = *pcap;
+ if (was_syncing) {
+ stgFree(idle_cap);
+ }
+ if (was_syncing && (prev_sync == SYNC_GC_SEQ ||
+ prev_sync == SYNC_GC_PAR)) {
+ // someone else had a pending sync request for a GC, so
+ // let's assume GC has been done and we don't need to GC
+ // again.
+ return;
+ }
+ if (sched_state == SCHED_SHUTTING_DOWN) {
+ // The scheduler might now be shutting down. We tested
+ // this above, but it might have become true since then as
+ // we yielded the capability in requestSync().
+ return;
+ }
+ } while (was_syncing);
+ }
- // don't declare this until after we have sync'd, because
- // n_capabilities may change.
- rtsBool idle_cap[n_capabilities];
#ifdef DEBUG
unsigned int old_n_capabilities = n_capabilities;
#endif
@@ -1516,18 +1628,13 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
// The final shutdown GC is always single-threaded, because it's
// possible that some of the Capabilities have no worker threads.
- if (gc_type == SYNC_GC_SEQ)
- {
+ if (gc_type == SYNC_GC_SEQ) {
traceEventRequestSeqGc(cap);
- }
- else
- {
+ } else {
traceEventRequestParGc(cap);
- debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
}
- if (gc_type == SYNC_GC_SEQ)
- {
+ if (gc_type == SYNC_GC_SEQ) {
// single-threaded GC: grab all the capabilities
acquireAllCapabilities(cap,task);
}
@@ -1543,31 +1650,45 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
if (RtsFlags.ParFlags.parGcNoSyncWithIdle == 0
|| (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
- collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
+ collect_gen >= RtsFlags.ParFlags.parGcLoadBalancingGen))
+ {
for (i=0; i < n_capabilities; i++) {
if (capabilities[i]->disabled) {
idle_cap[i] = tryGrabCapability(capabilities[i], task);
+ if (idle_cap[i]) {
+ n_idle_caps++;
+ }
} else {
- idle_cap[i] = rtsFalse;
+ if (i != cap->no && idle_cap[i]) {
+ Capability *tmpcap = capabilities[i];
+ task->cap = tmpcap;
+ waitForCapability(&tmpcap, task);
+ n_idle_caps++;
+ }
}
}
- } else {
+ }
+ else
+ {
for (i=0; i < n_capabilities; i++) {
if (capabilities[i]->disabled) {
idle_cap[i] = tryGrabCapability(capabilities[i], task);
- } else if (i == cap->no ||
- capabilities[i]->idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
- idle_cap[i] = rtsFalse;
- } else {
+ if (idle_cap[i]) {
+ n_idle_caps++;
+ }
+ } else if (i != cap->no &&
+ capabilities[i]->idle >=
+ RtsFlags.ParFlags.parGcNoSyncWithIdle) {
idle_cap[i] = tryGrabCapability(capabilities[i], task);
- if (!idle_cap[i]) {
- n_failed_trygrab_idles++;
- } else {
+ if (idle_cap[i]) {
n_idle_caps++;
+ } else {
+ n_failed_trygrab_idles++;
}
}
}
}
+ debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
// We set the gc_thread[i]->idle flag if that
// capability/thread is not participating in this collection.
@@ -1719,6 +1840,8 @@ delete_threads_and_gc:
}
task->cap = cap;
}
+
+ stgFree(idle_cap);
#endif
if (heap_overflow && sched_state < SCHED_INTERRUPTING) {
@@ -1774,9 +1897,6 @@ forkProcess(HsStablePtr *entry
nat g;
Task *task = NULL;
nat i;
-#ifdef THREADED_RTS
- nat sync;
-#endif
debugTrace(DEBUG_sched, "forking!");
@@ -1786,13 +1906,7 @@ forkProcess(HsStablePtr *entry
waitForCapability(&cap, task);
#ifdef THREADED_RTS
- do {
- sync = requestSync(&cap, task, SYNC_OTHER);
- } while (sync);
-
- acquireAllCapabilities(cap,task);
-
- pending_sync = 0;
+ stopAllCapabilities(&cap, task);
#endif
// no funny business: hold locks while we fork, otherwise if some
@@ -1980,7 +2094,6 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
#else
Task *task;
Capability *cap;
- nat sync;
nat n;
Capability *old_capabilities = NULL;
nat old_n_capabilities = n_capabilities;
@@ -1993,13 +2106,7 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
cap = rts_lock();
task = cap->running_task;
- do {
- sync = requestSync(&cap, task, SYNC_OTHER);
- } while (sync);
-
- acquireAllCapabilities(cap,task);
-
- pending_sync = 0;
+ stopAllCapabilities(&cap, task);
if (new_n_capabilities < enabled_capabilities)
{