summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2011-12-13 13:09:46 +0000
committerSimon Marlow <marlowsd@gmail.com>2011-12-15 15:48:54 +0000
commit9bae79159d3cb5cbb6491711341aa9b07d703ae6 (patch)
tree4aa767f2a58d4d42637dc999ab469dd2dd07db19
parentdff852b1b65d07a4a400d3f20c854172c8fcecaf (diff)
downloadhaskell-9bae79159d3cb5cbb6491711341aa9b07d703ae6.tar.gz
Support for reducing the number of Capabilities with setNumCapabilities
This patch allows setNumCapabilities to /reduce/ the number of active capabilities as well as increase it. This is particularly tricky to do, because a Capability is a large data structure and ties into the rest of the system in many ways. Trying to clean it all up would be extremely error prone. So instead, the solution is to mark the extra capabilities as "disabled". This has the following consequences: - threads on a disabled capability are migrated away by the scheduler loop - disabled capabilities do not participate in GC (see scheduleDoGC()) - No spark threads are created on this capability (see scheduleActivateSpark()) - We do not attempt to migrate threads *to* a disabled capability (see schedulePushWork()). So a disabled capability should do no work, and does not participate in GC, although it remains alive in other respects. For example, a blocked thread might wake up on a disabled capability, and it will get quickly migrated to a live capability. A disabled capability can still initiate GC if necessary. Indeed, it turns out to be hard to migrate bound threads, so we wait until the next GC to do this (see comments for details).
-rw-r--r--rts/Capability.c19
-rw-r--r--rts/Capability.h4
-rw-r--r--rts/Schedule.c244
3 files changed, 195 insertions, 72 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 41efb176fd..d04d007006 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -34,6 +34,7 @@
Capability MainCapability;
nat n_capabilities = 0;
+nat enabled_capabilities = 0;
Capability *capabilities = NULL;
// Holds the Capability which last became free. This is used so that
@@ -323,6 +324,8 @@ initCapabilities( void )
#endif
+ enabled_capabilities = n_capabilities;
+
// There are no free capabilities to begin with. We will start
// a worker Task to each Capability, which will quickly put the
// Capability on the free list when it finds nothing to do.
@@ -493,7 +496,7 @@ releaseCapability_ (Capability* cap,
// anything else to do, give the Capability to a worker thread.
if (always_wakeup ||
!emptyRunQueue(cap) || !emptyInbox(cap) ||
- !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ (!cap->disabled && !emptySparkPoolCap(cap)) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
@@ -682,7 +685,8 @@ yieldCapability (Capability** pCap, Task *task)
gcWorkerThread(cap);
traceEventGcEnd(cap);
traceSparkCounters(cap);
- return;
+ // See Note [migrated bound threads 2]
+ if (task->cap == cap) return;
}
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
@@ -768,6 +772,17 @@ yieldCapability (Capability** pCap, Task *task)
// hold Capabilty C, and task->cap == C, then task cannot be
// migrated under our feet.
+// Note [migrated bound threads 2]
+//
+// Second tricky case;
+// - A bound Task becomes a GC thread
+// - scheduleDoGC() migrates the thread belonging to this Task,
+// because the Capability it is on is disabled
+// - after GC, gcWorkerThread() returns, but now we are
+// holding a Capability that is not the same as task->cap
+// - Hence we must check for this case and immediately give up the
+// cap we hold.
+
/* ----------------------------------------------------------------------------
* prodCapability
*
diff --git a/rts/Capability.h b/rts/Capability.h
index f60adf9de4..91b4567186 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -49,6 +49,8 @@ struct Capability_ {
// Has there been any activity on this Capability since the last GC?
nat idle;
+ rtsBool disabled;
+
// The run queue. The Task owning this Capability has exclusive
// access to its run queue, so can wake up threads without
// taking a lock, and the common path through the scheduler is
@@ -197,6 +199,8 @@ INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED,
// declared in includes/rts/Threads.h:
// extern nat n_capabilities;
+extern nat enabled_capabilities;
+
// Array of all the capabilities
//
extern Capability *capabilities;
diff --git a/rts/Schedule.c b/rts/Schedule.c
index eedff32ed1..72b7217ebb 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -133,7 +133,7 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
-static void scheduleFindWork (Capability *cap);
+static void scheduleFindWork (Capability **pcap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task);
#endif
@@ -145,8 +145,8 @@ static void startWorkerTasks (nat from USED_IF_THREADS, nat to USED_IF_THREADS);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
-static void scheduleProcessInbox(Capability *cap);
-static void scheduleDetectDeadlock (Capability *cap, Task *task);
+static void scheduleProcessInbox(Capability **cap);
+static void scheduleDetectDeadlock (Capability **pcap, Task *task);
static void schedulePushWork(Capability *cap, Task *task);
#if defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
@@ -159,8 +159,7 @@ static void scheduleHandleThreadBlocked( StgTSO *t );
static rtsBool scheduleHandleThreadFinished( Capability *cap, Task *task,
StgTSO *t );
static rtsBool scheduleNeedHeapProfile(rtsBool ready_to_gc);
-static Capability *scheduleDoGC(Capability *cap, Task *task,
- rtsBool force_major);
+static void scheduleDoGC(Capability **pcap, Task *task, rtsBool force_major);
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteAllThreads (Capability *cap);
@@ -281,7 +280,7 @@ schedule (Capability *initialCapability, Task *task)
case SCHED_INTERRUPTING:
debugTrace(DEBUG_sched, "SCHED_INTERRUPTING");
/* scheduleDoGC() deletes all the threads */
- cap = scheduleDoGC(cap,task,rtsFalse);
+ scheduleDoGC(&cap,task,rtsFalse);
// after scheduleDoGC(), we must be shutting down. Either some
// other Capability did the final GC, or we did it above,
@@ -303,17 +302,13 @@ schedule (Capability *initialCapability, Task *task)
barf("sched_state: %d", sched_state);
}
- scheduleFindWork(cap);
+ scheduleFindWork(&cap);
/* work pushing, currently relevant only for THREADED_RTS:
(pushes threads, wakes up idle capabilities for stealing) */
schedulePushWork(cap,task);
- scheduleDetectDeadlock(cap,task);
-
-#if defined(THREADED_RTS)
- cap = task->cap; // reload cap, it might have changed
-#endif
+ scheduleDetectDeadlock(&cap,task);
// Normally, the only way we can get here with no threads to
// run is if a keyboard interrupt received during
@@ -396,6 +391,26 @@ schedule (Capability *initialCapability, Task *task)
deleteThread(cap,t);
}
+ // If this capability is disabled, migrate the thread away rather
+ // than running it. NB. but not if the thread is bound: it is
+ // really hard for a bound thread to migrate itself. Believe me,
+ // I tried several ways and couldn't find a way to do it.
+ // Instead, when everything is stopped for GC, we migrate all the
+ // threads on the run queue then (see scheduleDoGC()).
+ //
+ // ToDo: what about TSO_LOCKED? Currently we're migrating those
+ // when the number of capabilities drops, but we never migrate
+ // them back if it rises again. Presumably we should, but after
+ // the thread has been migrated we no longer know what capability
+ // it was originally on.
+#ifdef THREADED_RTS
+ if (cap->disabled && !t->bound) {
+ Capability *dest_cap = &capabilities[cap->no % enabled_capabilities];
+ migrateThread(cap, t, dest_cap);
+ continue;
+ }
+#endif
+
/* context switches are initiated by the timer signal, unless
* the user specified "context switch as often as possible", with
* +RTS -C0
@@ -558,7 +573,7 @@ run_thread:
}
if (ready_to_gc || scheduleNeedHeapProfile(ready_to_gc)) {
- cap = scheduleDoGC(cap,task,rtsFalse);
+ scheduleDoGC(&cap,task,rtsFalse);
}
} /* end of while() */
}
@@ -608,16 +623,16 @@ schedulePreLoop(void)
* -------------------------------------------------------------------------- */
static void
-scheduleFindWork (Capability *cap)
+scheduleFindWork (Capability **pcap)
{
- scheduleStartSignalHandlers(cap);
+ scheduleStartSignalHandlers(*pcap);
- scheduleProcessInbox(cap);
+ scheduleProcessInbox(pcap);
- scheduleCheckBlockedThreads(cap);
+ scheduleCheckBlockedThreads(*pcap);
#if defined(THREADED_RTS)
- if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
+ if (emptyRunQueue(*pcap)) { scheduleActivateSpark(*pcap); }
#endif
}
@@ -707,10 +722,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// First grab as many free Capabilities as we can.
for (i=0, n_free_caps=0; i < n_capabilities; i++) {
cap0 = &capabilities[i];
- if (cap != cap0 && tryGrabCapability(cap0,task)) {
+ if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
- || cap->returning_tasks_hd != NULL
- || cap->inbox != (Message*)END_TSO_QUEUE) {
+ || cap0->returning_tasks_hd != NULL
+ || cap0->inbox != (Message*)END_TSO_QUEUE) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
releaseCapability(cap0);
@@ -869,9 +884,10 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
* ------------------------------------------------------------------------- */
static void
-scheduleDetectDeadlock (Capability *cap, Task *task)
+scheduleDetectDeadlock (Capability **pcap, Task *task)
{
- /*
+ Capability *cap = *pcap;
+ /*
* Detect deadlock: when we have no threads to run, there are no
* threads blocked, waiting for I/O, or sleeping, and all the
* other tasks are waiting for work, we must have a deadlock of
@@ -896,7 +912,8 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
// they are unreachable and will therefore be sent an
// exception. Any threads thus released will be immediately
// runnable.
- cap = scheduleDoGC (cap, task, rtsTrue/*force major GC*/);
+ scheduleDoGC (pcap, task, rtsTrue/*force major GC*/);
+ cap = *pcap;
// when force_major == rtsTrue. scheduleDoGC sets
// recent_activity to ACTIVITY_DONE_GC and turns off the timer
// signal.
@@ -976,16 +993,18 @@ scheduleSendPendingMessages(void)
* ------------------------------------------------------------------------- */
static void
-scheduleProcessInbox (Capability *cap USED_IF_THREADS)
+scheduleProcessInbox (Capability **pcap USED_IF_THREADS)
{
#if defined(THREADED_RTS)
Message *m, *next;
int r;
+ Capability *cap = *pcap;
while (!emptyInbox(cap)) {
if (cap->r.rCurrentNursery->link == NULL ||
g0->n_new_large_words >= large_alloc_lim) {
- scheduleDoGC(cap, cap->running_task, rtsFalse);
+ scheduleDoGC(pcap, cap->running_task, rtsFalse);
+ cap = *pcap;
}
// don't use a blocking acquire; if the lock is held by
@@ -1023,7 +1042,7 @@ scheduleProcessInbox (Capability *cap USED_IF_THREADS)
static void
scheduleActivateSpark(Capability *cap)
{
- if (anySparks())
+ if (anySparks() && !cap->disabled)
{
createSparkThread(cap);
debugTrace(DEBUG_sched, "creating a spark thread");
@@ -1415,21 +1434,24 @@ static void releaseAllCapabilities(Capability *cap, Task *task)
* Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */
-static Capability *
-scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
+static void
+scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
+ rtsBool force_major)
{
+ Capability *cap = *pcap;
rtsBool heap_census;
#ifdef THREADED_RTS
rtsBool idle_cap[n_capabilities];
rtsBool gc_type;
nat i, sync;
+ StgTSO *tso;
#endif
if (sched_state == SCHED_SHUTTING_DOWN) {
// The final GC has already been done, and the system is
// shutting down. We'll probably deadlock if we try to GC
// now.
- return cap;
+ return;
}
#ifdef THREADED_RTS
@@ -1459,12 +1481,19 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
yieldCapability() and releaseCapability() in Capability.c */
do {
- sync = requestSync(&cap, task, gc_type);
+ sync = requestSync(pcap, task, gc_type);
+ cap = *pcap;
if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
// someone else had a pending sync request for a GC, so
// let's assume GC has been done and we don't need to GC
// again.
- return cap;
+ return;
+ }
+ if (sched_state == SCHED_SHUTTING_DOWN) {
+ // The scheduler might now be shutting down. We tested
+ // this above, but it might have become true since then as
+ // we yielded the capability in requestSync().
+ return;
}
} while (sync);
@@ -1502,11 +1531,18 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
|| (RtsFlags.ParFlags.parGcLoadBalancingEnabled &&
N >= RtsFlags.ParFlags.parGcLoadBalancingGen)) {
for (i=0; i < n_capabilities; i++) {
- idle_cap[i] = rtsFalse;
+ if (capabilities[i].disabled) {
+ idle_cap[i] = tryGrabCapability(&capabilities[i], task);
+ } else {
+ idle_cap[i] = rtsFalse;
+ }
}
} else {
for (i=0; i < n_capabilities; i++) {
- if (i == cap->no || capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
+ if (capabilities[i].disabled) {
+ idle_cap[i] = tryGrabCapability(&capabilities[i], task);
+ } else if (i == cap->no ||
+ capabilities[i].idle < RtsFlags.ParFlags.parGcNoSyncWithIdle) {
idle_cap[i] = rtsFalse;
} else {
idle_cap[i] = tryGrabCapability(&capabilities[i], task);
@@ -1570,6 +1606,29 @@ delete_threads_and_gc:
sched_state = SCHED_SHUTTING_DOWN;
}
+ /*
+ * When there are disabled capabilities, we want to migrate any
+ * threads away from them. Normally this happens in the
+ * scheduler's loop, but only for unbound threads - it's really
+ * hard for a bound thread to migrate itself. So we have another
+ * go here.
+ */
+#if defined(THREADED_RTS)
+ for (i = enabled_capabilities; i < n_capabilities; i++) {
+ Capability *tmp_cap, *dest_cap;
+ tmp_cap = &capabilities[i];
+ ASSERT(tmp_cap->disabled);
+ if (i != cap->no) {
+ dest_cap = &capabilities[i % enabled_capabilities];
+ while (!emptyRunQueue(tmp_cap)) {
+ tso = popRunQueue(tmp_cap);
+ migrateThread(tmp_cap, tso, dest_cap);
+ if (tso->bound) { tso->bound->task->cap = dest_cap; }
+ }
+ }
+ }
+#endif
+
heap_census = scheduleNeedHeapProfile(rtsTrue);
traceEventGcStart(cap);
@@ -1663,7 +1722,7 @@ delete_threads_and_gc:
}
#endif
- return cap;
+ return;
}
/* ---------------------------------------------------------------------------
@@ -1848,7 +1907,7 @@ forkProcess(HsStablePtr *entry
}
/* ---------------------------------------------------------------------------
- * Increase the number of Capabilities
+ * Changing the number of Capabilities
*
* Changing the number of Capabilities is very tricky! We can only do
* it with the system fully stopped, so we do a full sync with
@@ -1873,17 +1932,13 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
Capability *cap;
nat sync;
StgTSO* t;
- nat g;
- Capability *old_capabilities;
-
- if (new_n_capabilities == n_capabilities) return;
+ nat g, n;
+ Capability *old_capabilities = NULL;
- if (new_n_capabilities < n_capabilities) {
- barf("setNumCapabilities: reducing the number of Capabilities is not currently supported.");
- }
+ if (new_n_capabilities == enabled_capabilities) return;
debugTrace(DEBUG_sched, "changing the number of Capabilities from %d to %d",
- n_capabilities, new_n_capabilities);
+ enabled_capabilities, new_n_capabilities);
cap = rts_lock();
task = cap->running_task;
@@ -1896,31 +1951,76 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
pending_sync = 0;
+ if (new_n_capabilities < enabled_capabilities)
+ {
+ // Reducing the number of capabilities: we do not actually
+ // remove the extra capabilities, we just mark them as
+ // "disabled". This has the following effects:
+ //
+ // - threads on a disabled capability are migrated away by the
+ // scheduler loop
+ //
+ // - disabled capabilities do not participate in GC
+ // (see scheduleDoGC())
+ //
+ // - No spark threads are created on this capability
+ // (see scheduleActivateSpark())
+ //
+ // - We do not attempt to migrate threads *to* a disabled
+ // capability (see schedulePushWork()).
+ //
+ // but in other respects, a disabled capability remains
+ // alive. Threads may be woken up on a disabled capability,
+ // but they will be immediately migrated away.
+ //
+ // This approach is much easier than trying to actually remove
+ // the capability; we don't have to worry about GC data
+ // structures, the nursery, etc.
+ //
+ for (n = new_n_capabilities; n < enabled_capabilities; n++) {
+ capabilities[n].disabled = rtsTrue;
+ }
+ enabled_capabilities = new_n_capabilities;
+ }
+ else
+ {
+ // Increasing the number of enabled capabilities.
+ //
+ // enable any disabled capabilities, up to the required number
+ for (n = enabled_capabilities;
+ n < new_n_capabilities && n < n_capabilities; n++) {
+ capabilities[n].disabled = rtsFalse;
+ }
+ enabled_capabilities = n;
+
+ if (new_n_capabilities > n_capabilities) {
#if defined(TRACING)
- // Allocate eventlog buffers for the new capabilities. Note this
- // must be done before calling moreCapabilities(), because that
- // will emit events to add the new capabilities to capsets.
- tracingAddCapapilities(n_capabilities, new_n_capabilities);
+ // Allocate eventlog buffers for the new capabilities. Note this
+ // must be done before calling moreCapabilities(), because that
+ // will emit events to add the new capabilities to capsets.
+ tracingAddCapapilities(n_capabilities, new_n_capabilities);
#endif
- // Resize the capabilities array
- // NB. after this, capabilities points somewhere new. Any pointers
- // of type (Capability *) are now invalid.
- old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
+ // Resize the capabilities array
+ // NB. after this, capabilities points somewhere new. Any pointers
+ // of type (Capability *) are now invalid.
+ old_capabilities = moreCapabilities(n_capabilities, new_n_capabilities);
- // update our own cap pointer
- cap = &capabilities[cap->no];
+ // update our own cap pointer
+ cap = &capabilities[cap->no];
- // Resize and update storage manager data structures
- storageAddCapabilities(n_capabilities, new_n_capabilities);
+ // Resize and update storage manager data structures
+ storageAddCapabilities(n_capabilities, new_n_capabilities);
- // Update (Capability *) refs in the Task manager.
- updateCapabilityRefs();
+ // Update (Capability *) refs in the Task manager.
+ updateCapabilityRefs();
- // Update (Capability *) refs from TSOs
- for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
- for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
- t->cap = &capabilities[t->cap->no];
+ // Update (Capability *) refs from TSOs
+ for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
+ for (t = generations[g].threads; t != END_TSO_QUEUE; t = t->global_link) {
+ t->cap = &capabilities[t->cap->no];
+ }
+ }
}
}
@@ -1931,7 +2031,9 @@ setNumCapabilities (nat new_n_capabilities USED_IF_THREADS)
startWorkerTasks(n_capabilities, new_n_capabilities);
// finally, update n_capabilities
- n_capabilities = new_n_capabilities;
+ if (new_n_capabilities > n_capabilities) {
+ n_capabilities = enabled_capabilities = new_n_capabilities;
+ }
// We can't free the old array until now, because we access it
// while updating pointers in updateCapabilityRefs().
@@ -2177,7 +2279,7 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
// move this thread from now on.
#if defined(THREADED_RTS)
- cpu %= n_capabilities;
+ cpu %= enabled_capabilities;
if (cpu == cap->no) {
appendToRunQueue(cap,tso);
} else {
@@ -2332,10 +2434,11 @@ exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
// If we haven't killed all the threads yet, do it now.
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
- waitForReturnCapability(&task->cap,task);
- scheduleDoGC(task->cap,task,rtsFalse);
+ Capability *cap = task->cap;
+ waitForReturnCapability(&cap,task);
+ scheduleDoGC(&cap,task,rtsFalse);
ASSERT(task->incall->tso == NULL);
- releaseCapability(task->cap);
+ releaseCapability(cap);
}
sched_state = SCHED_SHUTTING_DOWN;
@@ -2394,15 +2497,16 @@ static void
performGC_(rtsBool force_major)
{
Task *task;
+ Capability *cap = NULL;
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
// suspended_ccalls queue.
task = newBoundTask();
- waitForReturnCapability(&task->cap,task);
- scheduleDoGC(task->cap,task,force_major);
- releaseCapability(task->cap);
+ waitForReturnCapability(&cap,task);
+ scheduleDoGC(&cap,task,force_major);
+ releaseCapability(cap);
boundTaskExiting(task);
}