diff options
author | Simon Marlow <marlowsd@gmail.com> | 2011-12-06 11:38:07 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2011-12-06 15:19:18 +0000 |
commit | 8b75acd3ca25165536f18976c8d80cb62ad613e4 (patch) | |
tree | ccb87f6f5df2af15ca2ca8f65e5163b1f34886b8 /rts/Schedule.c | |
parent | 657773c8e59917fda05ee08065ec566aebb50a5f (diff) | |
download | haskell-8b75acd3ca25165536f18976c8d80cb62ad613e4.tar.gz |
Make forkProcess work with +RTS -N
Consider this experimental for the time being. There are a lot of
things that could go wrong, but I've verified that at least it works
on the test cases we have.
I also did some API cleanups while I was here. Previously we had:
Capability * rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret);
but this API is particularly error-prone: if you forget to discard the
Capability * you passed in and use the return value instead, then
you're in for subtle bugs with +RTS -N later on. So I changed all
these functions to this form:
void rts_eval (/* inout */ Capability **cap,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret)
It's much harder to use this version incorrectly, because you have to
pass the Capability in by reference.
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r-- | rts/Schedule.c | 268 |
1 files changed, 182 insertions, 86 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index cd704d2871..70f6a3fc00 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -40,6 +40,7 @@ #include "Timer.h" #include "ThreadPaused.h" #include "Messages.h" +#include "Stable.h" #ifdef HAVE_SYS_TYPES_H #include <sys/types.h> @@ -130,6 +131,10 @@ static void scheduleFindWork (Capability *cap); #if defined(THREADED_RTS) static void scheduleYield (Capability **pcap, Task *task); #endif +#if defined(THREADED_RTS) +static nat requestSync (Capability **pcap, Task *task, nat sync_type); +static void acquireAllCapabilities(Capability *cap, Task *task); +#endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleProcessInbox(Capability *cap); @@ -617,7 +622,7 @@ shouldYieldCapability (Capability *cap, Task *task) // - the thread at the head of the run queue cannot be run // by this Task (it is bound to another Task, or it is unbound // and this task it bound). - return (waiting_for_gc || + return (pending_sync || cap->returning_tasks_hd != NULL || (!emptyRunQueue(cap) && (task->incall->tso == NULL ? cap->run_queue_hd->bound != NULL @@ -1319,6 +1324,72 @@ scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED ) } /* ----------------------------------------------------------------------------- + * Start a synchronisation of all capabilities + * -------------------------------------------------------------------------- */ + +// Returns: +// 0 if we successfully got a sync +// non-0 if there was another sync request in progress, +// and we yielded to it. The value returned is the +// type of the other sync request. +// +#if defined(THREADED_RTS) +static nat requestSync (Capability **pcap, Task *task, nat sync_type) +{ + nat prev_pending_sync; + + prev_pending_sync = cas(&pending_sync, 0, sync_type); + + if (prev_pending_sync) + { + do { + debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...", + prev_pending_sync); + ASSERT(*pcap); + yieldCapability(pcap,task); + } while (pending_sync); + return prev_pending_sync; // NOTE: task->cap might have changed now + } + else + { + return 0; + } +} + +// +// Grab all the capabilities except the one we already hold. Used +// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and +// before a fork (SYNC_FORK). +// +// Only call this after requestSync(), otherwise a deadlock might +// ensue if another thread is trying to synchronise. +// +static void acquireAllCapabilities(Capability *cap, Task *task) +{ + Capability *tmpcap; + nat i; + + for (i=0; i < n_capabilities; i++) { + debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities); + tmpcap = &capabilities[i]; + if (tmpcap != cap) { + // we better hope this task doesn't get migrated to + // another Capability while we're waiting for this one. + // It won't, because load balancing happens while we have + // all the Capabilities, but even so it's a slightly + // unsavoury invariant. + task->cap = tmpcap; + waitForReturnCapability(&tmpcap, task); + if (tmpcap != &capabilities[i]) { + barf("acquireAllCapabilities: got the wrong capability"); + } + } + } +} + +#endif + +/* ----------------------------------------------------------------------------- * Perform a garbage collection if necessary * -------------------------------------------------------------------------- */ @@ -1327,10 +1398,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) { rtsBool heap_census; #ifdef THREADED_RTS - /* extern static volatile StgWord waiting_for_gc; - lives inside capability.c */ - rtsBool gc_type, prev_pending_gc; - nat i; + rtsBool gc_type; + nat i, sync; #endif if (sched_state == SCHED_SHUTTING_DOWN) { @@ -1346,9 +1415,9 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) && N >= RtsFlags.ParFlags.parGcGen && ! oldest_gen->mark) { - gc_type = PENDING_GC_PAR; + gc_type = SYNC_GC_PAR; } else { - gc_type = PENDING_GC_SEQ; + gc_type = SYNC_GC_SEQ; } // In order to GC, there must be no threads running Haskell code. @@ -1363,26 +1432,25 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) // /* Other capabilities are prevented from running yet more Haskell - threads if waiting_for_gc is set. Tested inside + threads if pending_sync is set. Tested inside yieldCapability() and releaseCapability() in Capability.c */ - prev_pending_gc = cas(&waiting_for_gc, 0, gc_type); - if (prev_pending_gc) { - do { - debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", - prev_pending_gc); - ASSERT(cap); - yieldCapability(&cap,task); - } while (waiting_for_gc); - return cap; // NOTE: task->cap might have changed here - } + do { + sync = requestSync(&cap, task, gc_type); + if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) { + // someone else had a pending sync request for a GC, so + // let's assume GC has been done and we don't need to GC + // again. + return cap; + } + } while (sync); interruptAllCapabilities(); // The final shutdown GC is always single-threaded, because it's // possible that some of the Capabilities have no worker threads. - if (gc_type == PENDING_GC_SEQ) + if (gc_type == SYNC_GC_SEQ) { traceEventRequestSeqGc(cap); } @@ -1392,25 +1460,10 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); } - if (gc_type == PENDING_GC_SEQ) + if (gc_type == SYNC_GC_SEQ) { // single-threaded GC: grab all the capabilities - for (i=0; i < n_capabilities; i++) { - debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); - if (cap != &capabilities[i]) { - Capability *pcap = &capabilities[i]; - // we better hope this task doesn't get migrated to - // another Capability while we're waiting for this one. - // It won't, because load balancing happens while we have - // all the Capabilities, but even so it's a slightly - // unsavoury invariant. - task->cap = pcap; - waitForReturnCapability(&pcap, task); - if (pcap != &capabilities[i]) { - barf("scheduleDoGC: got the wrong capability"); - } - } - } + acquireAllCapabilities(cap,task); } else { @@ -1455,9 +1508,9 @@ delete_threads_and_gc: traceEventGcStart(cap); #if defined(THREADED_RTS) - // reset waiting_for_gc *before* GC, so that when the GC threads + // reset pending_sync *before* GC, so that when the GC threads // emerge they don't immediately re-enter the GC. - waiting_for_gc = 0; + pending_sync = 0; GarbageCollect(force_major || heap_census, heap_census, gc_type, cap); #else GarbageCollect(force_major || heap_census, heap_census, 0, cap); @@ -1494,7 +1547,7 @@ delete_threads_and_gc: } #if defined(THREADED_RTS) - if (gc_type == PENDING_GC_PAR) + if (gc_type == SYNC_GC_PAR) { releaseGCThreads(cap); } @@ -1526,7 +1579,7 @@ delete_threads_and_gc: #endif #if defined(THREADED_RTS) - if (gc_type == PENDING_GC_SEQ) { + if (gc_type == SYNC_GC_SEQ) { // release our stash of capabilities. for (i = 0; i < n_capabilities; i++) { if (cap != &capabilities[i]) { @@ -1561,26 +1614,41 @@ forkProcess(HsStablePtr *entry StgTSO* t,*next; Capability *cap; nat g; - -#if defined(THREADED_RTS) - if (RtsFlags.ParFlags.nNodes > 1) { - errorBelch("forking not supported with +RTS -N<n> greater than 1"); - stg_exit(EXIT_FAILURE); - } + Task *task = NULL; + nat i; +#ifdef THREADED_RTS + nat sync; #endif debugTrace(DEBUG_sched, "forking!"); - // ToDo: for SMP, we should probably acquire *all* the capabilities - cap = rts_lock(); - + task = newBoundTask(); + + cap = NULL; + waitForReturnCapability(&cap, task); + +#ifdef THREADED_RTS + do { + sync = requestSync(&cap, task, SYNC_FORK); + } while (sync); + + acquireAllCapabilities(cap,task); + + pending_sync = 0; +#endif + // no funny business: hold locks while we fork, otherwise if some // other thread is holding a lock when the fork happens, the data // structure protected by the lock will forever be in an // inconsistent state in the child. See also #1391. ACQUIRE_LOCK(&sched_mutex); - ACQUIRE_LOCK(&cap->lock); - ACQUIRE_LOCK(&cap->running_task->lock); + ACQUIRE_LOCK(&sm_mutex); + ACQUIRE_LOCK(&stable_mutex); + ACQUIRE_LOCK(&task->lock); + + for (i=0; i < n_capabilities; i++) { + ACQUIRE_LOCK(&capabilities[i].lock); + } stopTimer(); // See #4074 @@ -1595,19 +1663,30 @@ forkProcess(HsStablePtr *entry startTimer(); // #4074 RELEASE_LOCK(&sched_mutex); - RELEASE_LOCK(&cap->lock); - RELEASE_LOCK(&cap->running_task->lock); + RELEASE_LOCK(&sm_mutex); + RELEASE_LOCK(&stable_mutex); + RELEASE_LOCK(&task->lock); + + for (i=0; i < n_capabilities; i++) { + releaseCapability_(&capabilities[i],rtsFalse); + RELEASE_LOCK(&capabilities[i].lock); + } + boundTaskExiting(task); // just return the pid - rts_unlock(cap); - return pid; + return pid; } else { // child #if defined(THREADED_RTS) initMutex(&sched_mutex); - initMutex(&cap->lock); - initMutex(&cap->running_task->lock); + initMutex(&sm_mutex); + initMutex(&stable_mutex); + initMutex(&task->lock); + + for (i=0; i < n_capabilities; i++) { + initMutex(&capabilities[i].lock); + } #endif #ifdef TRACING @@ -1626,7 +1705,7 @@ forkProcess(HsStablePtr *entry // don't allow threads to catch the ThreadKilled // exception, but we do want to raiseAsync() because these // threads may be evaluating thunks that we need later. - deleteThread_(cap,t); + deleteThread_(t->cap,t); // stop the GC from updating the InCall to point to // the TSO. This is only necessary because the @@ -1637,44 +1716,58 @@ forkProcess(HsStablePtr *entry } } - // Empty the run queue. It seems tempting to let all the - // killed threads stay on the run queue as zombies to be - // cleaned up later, but some of them correspond to bound - // threads for which the corresponding Task does not exist. - cap->run_queue_hd = END_TSO_QUEUE; - cap->run_queue_tl = END_TSO_QUEUE; - - // Any suspended C-calling Tasks are no more, their OS threads - // don't exist now: - cap->suspended_ccalls = NULL; - - // Empty the threads lists. Otherwise, the garbage + discardTasksExcept(task); + + for (i=0; i < n_capabilities; i++) { + cap = &capabilities[i]; + + // Empty the run queue. It seems tempting to let all the + // killed threads stay on the run queue as zombies to be + // cleaned up later, but some of them may correspond to + // bound threads for which the corresponding Task does not + // exist. + cap->run_queue_hd = END_TSO_QUEUE; + cap->run_queue_tl = END_TSO_QUEUE; + + // Any suspended C-calling Tasks are no more, their OS threads + // don't exist now: + cap->suspended_ccalls = NULL; + +#if defined(THREADED_RTS) + // Wipe our spare workers list, they no longer exist. New + // workers will be created if necessary. + cap->spare_workers = NULL; + cap->n_spare_workers = 0; + cap->returning_tasks_hd = NULL; + cap->returning_tasks_tl = NULL; +#endif + + // Release all caps except 0, we'll use that for starting + // the IO manager and running the client action below. + if (cap->no != 0) { + task->cap = cap; + releaseCapability(cap); + } + } + cap = &capabilities[0]; + task->cap = cap; + + // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. for (g = 0; g < RtsFlags.GcFlags.generations; g++) { generations[g].threads = END_TSO_QUEUE; } - discardTasksExcept(cap->running_task); - -#if defined(THREADED_RTS) - // Wipe our spare workers list, they no longer exist. New - // workers will be created if necessary. - cap->spare_workers = NULL; - cap->n_spare_workers = 0; - cap->returning_tasks_hd = NULL; - cap->returning_tasks_tl = NULL; -#endif - // On Unix, all timers are reset in the child, so we need to start // the timer again. initTimer(); startTimer(); #if defined(THREADED_RTS) - cap = ioManagerStartCap(cap); + ioManagerStartCap(&cap); #endif - cap = rts_evalStableIO(cap, entry, NULL); // run the action + rts_evalStableIO(&cap, entry, NULL); // run the action rts_checkSchedStatus("forkProcess",cap); rts_unlock(cap); @@ -1928,11 +2021,14 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) #endif } -Capability * -scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) +void +scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap) { Task *task; DEBUG_ONLY( StgThreadID id ); + Capability *cap; + + cap = *pcap; // We already created/initialised the Task task = cap->running_task; @@ -1957,7 +2053,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id); - return cap; + *pcap = cap; } /* ---------------------------------------------------------------------------- |