diff options
author | Simon Marlow <marlowsd@gmail.com> | 2011-12-06 11:38:07 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2011-12-06 15:19:18 +0000 |
commit | 8b75acd3ca25165536f18976c8d80cb62ad613e4 (patch) | |
tree | ccb87f6f5df2af15ca2ca8f65e5163b1f34886b8 | |
parent | 657773c8e59917fda05ee08065ec566aebb50a5f (diff) | |
download | haskell-8b75acd3ca25165536f18976c8d80cb62ad613e4.tar.gz |
Make forkProcess work with +RTS -N
Consider this experimental for the time being. There are a lot of
things that could go wrong, but I've verified that at least it works
on the test cases we have.
I also did some API cleanups while I was here. Previously we had:
Capability * rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret);
but this API is particularly error-prone: if you forget to discard the
Capability * you passed in and use the return value instead, then
you're in for subtle bugs with +RTS -N later on. So I changed all
these functions to this form:
void rts_eval (/* inout */ Capability **cap,
/* in */ HaskellObj p,
/* out */ HaskellObj *ret)
It's much harder to use this version incorrectly, because you have to
pass the Capability in by reference.
-rw-r--r-- | compiler/deSugar/DsForeign.lhs | 4 | ||||
-rw-r--r-- | includes/RtsAPI.h | 48 | ||||
-rw-r--r-- | includes/rts/Threads.h | 5 | ||||
-rw-r--r-- | rts/Capability.c | 17 | ||||
-rw-r--r-- | rts/Capability.h | 13 | ||||
-rw-r--r-- | rts/RtsAPI.c | 64 | ||||
-rw-r--r-- | rts/RtsMain.c | 2 | ||||
-rw-r--r-- | rts/RtsStartup.c | 2 | ||||
-rw-r--r-- | rts/Schedule.c | 268 | ||||
-rw-r--r-- | rts/Stable.c | 2 | ||||
-rw-r--r-- | rts/Stable.h | 5 | ||||
-rw-r--r-- | rts/posix/OSThreads.c | 2 | ||||
-rw-r--r-- | rts/posix/Signals.c | 9 | ||||
-rw-r--r-- | rts/posix/Signals.h | 2 | ||||
-rw-r--r-- | rts/sm/GC.c | 2 |
15 files changed, 285 insertions, 160 deletions
diff --git a/compiler/deSugar/DsForeign.lhs b/compiler/deSugar/DsForeign.lhs index 6f9bbc2ef8..30d4af9804 100644 --- a/compiler/deSugar/DsForeign.lhs +++ b/compiler/deSugar/DsForeign.lhs @@ -609,8 +609,8 @@ mkFExportCBits dflags c_nm maybe_target arg_htys res_hty is_IO_res_ty cc , declareCResult , text "cap = rts_lock();" -- create the application + perform it. - , ptext (sLit "cap=rts_evalIO") <> parens ( - cap <> + , ptext (sLit "rts_evalIO") <> parens ( + char '&' <> cap <> ptext (sLit "rts_apply") <> parens ( cap <> text "(HaskellObj)" diff --git a/includes/RtsAPI.h b/includes/RtsAPI.h index 8d948f9b49..e3b3f7d5f5 100644 --- a/includes/RtsAPI.h +++ b/includes/RtsAPI.h @@ -181,32 +181,44 @@ HsBool rts_getBool ( HaskellObj ); The versions ending in '_' allow you to specify an initial stack size. Note that these calls may cause Garbage Collection, so all HaskellObj references are rendered invalid by these calls. + + All of these functions take a (Capability **) - there is a + Capability pointer both input and output. We use an inout + parameter because this is less error-prone for the client than a + return value - the client could easily forget to use the return + value, whereas incorrectly using an inout parameter will usually + result in a type error. ------------------------------------------------------------------------- */ -Capability * -rts_eval (Capability *, HaskellObj p, /*out*/HaskellObj *ret); -Capability * -rts_eval_ (Capability *, HaskellObj p, unsigned int stack_size, - /*out*/HaskellObj *ret); +void rts_eval (/* inout */ Capability **, + /* in */ HaskellObj p, + /* out */ HaskellObj *ret); + +void rts_eval_ (/* inout */ Capability **, + /* in */ HaskellObj p, + /* in */ unsigned int stack_size, + /* out */ HaskellObj *ret); -Capability * -rts_evalIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret); +void rts_evalIO (/* inout */ Capability **, + /* in */ HaskellObj p, + /* out */ HaskellObj *ret); -Capability * -rts_evalStableIO (Capability *, HsStablePtr s, /*out*/HsStablePtr *ret); +void rts_evalStableIO (/* inout */ Capability **, + /* in */ HsStablePtr s, + /* out */ HsStablePtr *ret); -Capability * -rts_evalLazyIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret); +void rts_evalLazyIO (/* inout */ Capability **, + /* in */ HaskellObj p, + /* out */ HaskellObj *ret); -Capability * -rts_evalLazyIO_ (Capability *, HaskellObj p, unsigned int stack_size, - /*out*/HaskellObj *ret); +void rts_evalLazyIO_ (/* inout */ Capability **, + /* in */ HaskellObj p, + /* in */ unsigned int stack_size, + /* out */ HaskellObj *ret); -void -rts_checkSchedStatus (char* site, Capability *); +void rts_checkSchedStatus (char* site, Capability *); -SchedulerStatus -rts_getSchedStatus (Capability *cap); +SchedulerStatus rts_getSchedStatus (Capability *cap); /* -------------------------------------------------------------------------- Wrapper closures diff --git a/includes/rts/Threads.h b/includes/rts/Threads.h index c974142ce3..d2c4aff984 100644 --- a/includes/rts/Threads.h +++ b/includes/rts/Threads.h @@ -20,8 +20,9 @@ // StgTSO *createThread (Capability *cap, nat stack_size); -Capability *scheduleWaitThread (StgTSO *tso, /*out*/HaskellObj* ret, - Capability *cap); +void scheduleWaitThread (/* in */ StgTSO *tso, + /* out */ HaskellObj* ret, + /* inout */ Capability **cap); StgTSO *createGenThread (Capability *cap, nat stack_size, StgClosure *closure); diff --git a/rts/Capability.c b/rts/Capability.c index 26e420970b..4d23f71a86 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -40,8 +40,12 @@ Capability *capabilities = NULL; // locking, so we don't do that. Capability *last_free_capability = NULL; -/* GC indicator, in scope for the scheduler, init'ed to false */ -volatile StgWord waiting_for_gc = 0; +/* + * Indicates that the RTS wants to synchronise all the Capabilities + * for some reason. All Capabilities should stop and return to the + * scheduler. + */ +volatile StgWord pending_sync = 0; /* Let foreign code get the current Capability -- assuming there is one! * This is useful for unsafe foreign calls because they are called with @@ -422,13 +426,12 @@ releaseCapability_ (Capability* cap, return; } - if (waiting_for_gc == PENDING_GC_SEQ) { + if (pending_sync == SYNC_GC_SEQ || pending_sync == SYNC_FORK) { last_free_capability = cap; // needed? - debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no); + debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no); return; } - // If the next thread on the run queue is a bound thread, // give this Capability to the appropriate Task. if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) { @@ -536,7 +539,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS) #endif /* ---------------------------------------------------------------------------- - * waitForReturnCapability( Task *task ) + * waitForReturnCapability (Capability **pCap, Task *task) * * Purpose: when an OS thread returns from an external call, * it calls waitForReturnCapability() (via Schedule.resumeThread()) @@ -643,7 +646,7 @@ yieldCapability (Capability** pCap, Task *task) { Capability *cap = *pCap; - if (waiting_for_gc == PENDING_GC_PAR) { + if (pending_sync == SYNC_GC_PAR) { traceEventGcStart(cap); gcWorkerThread(cap); traceEventGcEnd(cap); diff --git a/rts/Capability.h b/rts/Capability.h index 1957487329..033806b3be 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -199,10 +199,15 @@ extern Capability *capabilities; // extern Capability *last_free_capability; -// GC indicator, in scope for the scheduler -#define PENDING_GC_SEQ 1 -#define PENDING_GC_PAR 2 -extern volatile StgWord waiting_for_gc; +// +// Indicates that the RTS wants to synchronise all the Capabilities +// for some reason. All Capabilities should stop and return to the +// scheduler. +// +#define SYNC_GC_SEQ 1 +#define SYNC_GC_PAR 2 +#define SYNC_FORK 3 +extern volatile StgWord pending_sync; // Acquires a capability at a return point. If *cap is non-NULL, then // this is taken as a preference for the Capability we wish to diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c index 8fcf8ce812..0463f15ad8 100644 --- a/rts/RtsAPI.c +++ b/rts/RtsAPI.c @@ -421,36 +421,39 @@ createStrictIOThread(Capability *cap, nat stack_size, StgClosure *closure) Evaluating Haskell expressions ------------------------------------------------------------------------- */ -Capability * -rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret) +void rts_eval (/* inout */ Capability **cap, + /* in */ HaskellObj p, + /* out */ HaskellObj *ret) { StgTSO *tso; - tso = createGenThread(cap, RtsFlags.GcFlags.initialStkSize, p); - return scheduleWaitThread(tso,ret,cap); + tso = createGenThread(*cap, RtsFlags.GcFlags.initialStkSize, p); + scheduleWaitThread(tso,ret,cap); } -Capability * -rts_eval_ (Capability *cap, HaskellObj p, unsigned int stack_size, - /*out*/HaskellObj *ret) +void rts_eval_ (/* inout */ Capability **cap, + /* in */ HaskellObj p, + /* in */ unsigned int stack_size, + /* out */ HaskellObj *ret) { StgTSO *tso; - tso = createGenThread(cap, stack_size, p); - return scheduleWaitThread(tso,ret,cap); + tso = createGenThread(*cap, stack_size, p); + scheduleWaitThread(tso,ret,cap); } /* * rts_evalIO() evaluates a value of the form (IO a), forcing the action's * result to WHNF before returning. */ -Capability * -rts_evalIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret) +void rts_evalIO (/* inout */ Capability **cap, + /* in */ HaskellObj p, + /* out */ HaskellObj *ret) { StgTSO* tso; - tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p); - return scheduleWaitThread(tso,ret,cap); + tso = createStrictIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p); + scheduleWaitThread(tso,ret,cap); } /* @@ -459,49 +462,50 @@ rts_evalIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret) * action's result to WHNF before returning. The result is returned * in a StablePtr. */ -Capability * -rts_evalStableIO (Capability *cap, HsStablePtr s, /*out*/HsStablePtr *ret) +void rts_evalStableIO (/* inout */ Capability **cap, + /* in */ HsStablePtr s, + /* out */ HsStablePtr *ret) { StgTSO* tso; StgClosure *p, *r; SchedulerStatus stat; - + p = (StgClosure *)deRefStablePtr(s); - tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p); + tso = createStrictIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p); // async exceptions are always blocked by default in the created // thread. See #1048. tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE; - cap = scheduleWaitThread(tso,&r,cap); - stat = rts_getSchedStatus(cap); + scheduleWaitThread(tso,&r,cap); + stat = rts_getSchedStatus(*cap); if (stat == Success && ret != NULL) { ASSERT(r != NULL); *ret = getStablePtr((StgPtr)r); } - - return cap; } /* * Like rts_evalIO(), but doesn't force the action's result. */ -Capability * -rts_evalLazyIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret) +void rts_evalLazyIO (/* inout */ Capability **cap, + /* in */ HaskellObj p, + /* out */ HaskellObj *ret) { StgTSO *tso; - tso = createIOThread(cap, RtsFlags.GcFlags.initialStkSize, p); - return scheduleWaitThread(tso,ret,cap); + tso = createIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p); + scheduleWaitThread(tso,ret,cap); } -Capability * -rts_evalLazyIO_ (Capability *cap, HaskellObj p, unsigned int stack_size, - /*out*/HaskellObj *ret) +void rts_evalLazyIO_ (/* inout */ Capability **cap, + /* in */ HaskellObj p, + /* in */ unsigned int stack_size, + /* out */ HaskellObj *ret) { StgTSO *tso; - tso = createIOThread(cap, stack_size, p); - return scheduleWaitThread(tso,ret,cap); + tso = createIOThread(*cap, stack_size, p); + scheduleWaitThread(tso,ret,cap); } /* Convenience function for decoding the returned status. */ diff --git a/rts/RtsMain.c b/rts/RtsMain.c index 0f6ca82382..2084435f16 100644 --- a/rts/RtsMain.c +++ b/rts/RtsMain.c @@ -60,7 +60,7 @@ static void real_main(void) /* ToDo: want to start with a larger stack size */ { Capability *cap = rts_lock(); - cap = rts_evalLazyIO(cap,progmain_closure, NULL); + rts_evalLazyIO(&cap,progmain_closure, NULL); status = rts_getSchedStatus(cap); taskTimeStamp(myTask()); rts_unlock(cap); diff --git a/rts/RtsStartup.c b/rts/RtsStartup.c index c451292012..c63f85ee82 100644 --- a/rts/RtsStartup.c +++ b/rts/RtsStartup.c @@ -431,7 +431,7 @@ static void flushStdHandles(void) { Capability *cap; cap = rts_lock(); - cap = rts_evalIO(cap, flushStdHandles_closure, NULL); + rts_evalIO(&cap, flushStdHandles_closure, NULL); rts_unlock(cap); } diff --git a/rts/Schedule.c b/rts/Schedule.c index cd704d2871..70f6a3fc00 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -40,6 +40,7 @@ #include "Timer.h" #include "ThreadPaused.h" #include "Messages.h" +#include "Stable.h" #ifdef HAVE_SYS_TYPES_H #include <sys/types.h> @@ -130,6 +131,10 @@ static void scheduleFindWork (Capability *cap); #if defined(THREADED_RTS) static void scheduleYield (Capability **pcap, Task *task); #endif +#if defined(THREADED_RTS) +static nat requestSync (Capability **pcap, Task *task, nat sync_type); +static void acquireAllCapabilities(Capability *cap, Task *task); +#endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleProcessInbox(Capability *cap); @@ -617,7 +622,7 @@ shouldYieldCapability (Capability *cap, Task *task) // - the thread at the head of the run queue cannot be run // by this Task (it is bound to another Task, or it is unbound // and this task it bound). - return (waiting_for_gc || + return (pending_sync || cap->returning_tasks_hd != NULL || (!emptyRunQueue(cap) && (task->incall->tso == NULL ? cap->run_queue_hd->bound != NULL @@ -1319,6 +1324,72 @@ scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED ) } /* ----------------------------------------------------------------------------- + * Start a synchronisation of all capabilities + * -------------------------------------------------------------------------- */ + +// Returns: +// 0 if we successfully got a sync +// non-0 if there was another sync request in progress, +// and we yielded to it. The value returned is the +// type of the other sync request. +// +#if defined(THREADED_RTS) +static nat requestSync (Capability **pcap, Task *task, nat sync_type) +{ + nat prev_pending_sync; + + prev_pending_sync = cas(&pending_sync, 0, sync_type); + + if (prev_pending_sync) + { + do { + debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...", + prev_pending_sync); + ASSERT(*pcap); + yieldCapability(pcap,task); + } while (pending_sync); + return prev_pending_sync; // NOTE: task->cap might have changed now + } + else + { + return 0; + } +} + +// +// Grab all the capabilities except the one we already hold. Used +// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and +// before a fork (SYNC_FORK). +// +// Only call this after requestSync(), otherwise a deadlock might +// ensue if another thread is trying to synchronise. +// +static void acquireAllCapabilities(Capability *cap, Task *task) +{ + Capability *tmpcap; + nat i; + + for (i=0; i < n_capabilities; i++) { + debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities); + tmpcap = &capabilities[i]; + if (tmpcap != cap) { + // we better hope this task doesn't get migrated to + // another Capability while we're waiting for this one. + // It won't, because load balancing happens while we have + // all the Capabilities, but even so it's a slightly + // unsavoury invariant. + task->cap = tmpcap; + waitForReturnCapability(&tmpcap, task); + if (tmpcap != &capabilities[i]) { + barf("acquireAllCapabilities: got the wrong capability"); + } + } + } +} + +#endif + +/* ----------------------------------------------------------------------------- * Perform a garbage collection if necessary * -------------------------------------------------------------------------- */ @@ -1327,10 +1398,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) { rtsBool heap_census; #ifdef THREADED_RTS - /* extern static volatile StgWord waiting_for_gc; - lives inside capability.c */ - rtsBool gc_type, prev_pending_gc; - nat i; + rtsBool gc_type; + nat i, sync; #endif if (sched_state == SCHED_SHUTTING_DOWN) { @@ -1346,9 +1415,9 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) && N >= RtsFlags.ParFlags.parGcGen && ! oldest_gen->mark) { - gc_type = PENDING_GC_PAR; + gc_type = SYNC_GC_PAR; } else { - gc_type = PENDING_GC_SEQ; + gc_type = SYNC_GC_SEQ; } // In order to GC, there must be no threads running Haskell code. @@ -1363,26 +1432,25 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) // /* Other capabilities are prevented from running yet more Haskell - threads if waiting_for_gc is set. Tested inside + threads if pending_sync is set. Tested inside yieldCapability() and releaseCapability() in Capability.c */ - prev_pending_gc = cas(&waiting_for_gc, 0, gc_type); - if (prev_pending_gc) { - do { - debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...", - prev_pending_gc); - ASSERT(cap); - yieldCapability(&cap,task); - } while (waiting_for_gc); - return cap; // NOTE: task->cap might have changed here - } + do { + sync = requestSync(&cap, task, gc_type); + if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) { + // someone else had a pending sync request for a GC, so + // let's assume GC has been done and we don't need to GC + // again. + return cap; + } + } while (sync); interruptAllCapabilities(); // The final shutdown GC is always single-threaded, because it's // possible that some of the Capabilities have no worker threads. - if (gc_type == PENDING_GC_SEQ) + if (gc_type == SYNC_GC_SEQ) { traceEventRequestSeqGc(cap); } @@ -1392,25 +1460,10 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads"); } - if (gc_type == PENDING_GC_SEQ) + if (gc_type == SYNC_GC_SEQ) { // single-threaded GC: grab all the capabilities - for (i=0; i < n_capabilities; i++) { - debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities); - if (cap != &capabilities[i]) { - Capability *pcap = &capabilities[i]; - // we better hope this task doesn't get migrated to - // another Capability while we're waiting for this one. - // It won't, because load balancing happens while we have - // all the Capabilities, but even so it's a slightly - // unsavoury invariant. - task->cap = pcap; - waitForReturnCapability(&pcap, task); - if (pcap != &capabilities[i]) { - barf("scheduleDoGC: got the wrong capability"); - } - } - } + acquireAllCapabilities(cap,task); } else { @@ -1455,9 +1508,9 @@ delete_threads_and_gc: traceEventGcStart(cap); #if defined(THREADED_RTS) - // reset waiting_for_gc *before* GC, so that when the GC threads + // reset pending_sync *before* GC, so that when the GC threads // emerge they don't immediately re-enter the GC. - waiting_for_gc = 0; + pending_sync = 0; GarbageCollect(force_major || heap_census, heap_census, gc_type, cap); #else GarbageCollect(force_major || heap_census, heap_census, 0, cap); @@ -1494,7 +1547,7 @@ delete_threads_and_gc: } #if defined(THREADED_RTS) - if (gc_type == PENDING_GC_PAR) + if (gc_type == SYNC_GC_PAR) { releaseGCThreads(cap); } @@ -1526,7 +1579,7 @@ delete_threads_and_gc: #endif #if defined(THREADED_RTS) - if (gc_type == PENDING_GC_SEQ) { + if (gc_type == SYNC_GC_SEQ) { // release our stash of capabilities. for (i = 0; i < n_capabilities; i++) { if (cap != &capabilities[i]) { @@ -1561,26 +1614,41 @@ forkProcess(HsStablePtr *entry StgTSO* t,*next; Capability *cap; nat g; - -#if defined(THREADED_RTS) - if (RtsFlags.ParFlags.nNodes > 1) { - errorBelch("forking not supported with +RTS -N<n> greater than 1"); - stg_exit(EXIT_FAILURE); - } + Task *task = NULL; + nat i; +#ifdef THREADED_RTS + nat sync; #endif debugTrace(DEBUG_sched, "forking!"); - // ToDo: for SMP, we should probably acquire *all* the capabilities - cap = rts_lock(); - + task = newBoundTask(); + + cap = NULL; + waitForReturnCapability(&cap, task); + +#ifdef THREADED_RTS + do { + sync = requestSync(&cap, task, SYNC_FORK); + } while (sync); + + acquireAllCapabilities(cap,task); + + pending_sync = 0; +#endif + // no funny business: hold locks while we fork, otherwise if some // other thread is holding a lock when the fork happens, the data // structure protected by the lock will forever be in an // inconsistent state in the child. See also #1391. ACQUIRE_LOCK(&sched_mutex); - ACQUIRE_LOCK(&cap->lock); - ACQUIRE_LOCK(&cap->running_task->lock); + ACQUIRE_LOCK(&sm_mutex); + ACQUIRE_LOCK(&stable_mutex); + ACQUIRE_LOCK(&task->lock); + + for (i=0; i < n_capabilities; i++) { + ACQUIRE_LOCK(&capabilities[i].lock); + } stopTimer(); // See #4074 @@ -1595,19 +1663,30 @@ forkProcess(HsStablePtr *entry startTimer(); // #4074 RELEASE_LOCK(&sched_mutex); - RELEASE_LOCK(&cap->lock); - RELEASE_LOCK(&cap->running_task->lock); + RELEASE_LOCK(&sm_mutex); + RELEASE_LOCK(&stable_mutex); + RELEASE_LOCK(&task->lock); + + for (i=0; i < n_capabilities; i++) { + releaseCapability_(&capabilities[i],rtsFalse); + RELEASE_LOCK(&capabilities[i].lock); + } + boundTaskExiting(task); // just return the pid - rts_unlock(cap); - return pid; + return pid; } else { // child #if defined(THREADED_RTS) initMutex(&sched_mutex); - initMutex(&cap->lock); - initMutex(&cap->running_task->lock); + initMutex(&sm_mutex); + initMutex(&stable_mutex); + initMutex(&task->lock); + + for (i=0; i < n_capabilities; i++) { + initMutex(&capabilities[i].lock); + } #endif #ifdef TRACING @@ -1626,7 +1705,7 @@ forkProcess(HsStablePtr *entry // don't allow threads to catch the ThreadKilled // exception, but we do want to raiseAsync() because these // threads may be evaluating thunks that we need later. - deleteThread_(cap,t); + deleteThread_(t->cap,t); // stop the GC from updating the InCall to point to // the TSO. This is only necessary because the @@ -1637,44 +1716,58 @@ forkProcess(HsStablePtr *entry } } - // Empty the run queue. It seems tempting to let all the - // killed threads stay on the run queue as zombies to be - // cleaned up later, but some of them correspond to bound - // threads for which the corresponding Task does not exist. - cap->run_queue_hd = END_TSO_QUEUE; - cap->run_queue_tl = END_TSO_QUEUE; - - // Any suspended C-calling Tasks are no more, their OS threads - // don't exist now: - cap->suspended_ccalls = NULL; - - // Empty the threads lists. Otherwise, the garbage + discardTasksExcept(task); + + for (i=0; i < n_capabilities; i++) { + cap = &capabilities[i]; + + // Empty the run queue. It seems tempting to let all the + // killed threads stay on the run queue as zombies to be + // cleaned up later, but some of them may correspond to + // bound threads for which the corresponding Task does not + // exist. + cap->run_queue_hd = END_TSO_QUEUE; + cap->run_queue_tl = END_TSO_QUEUE; + + // Any suspended C-calling Tasks are no more, their OS threads + // don't exist now: + cap->suspended_ccalls = NULL; + +#if defined(THREADED_RTS) + // Wipe our spare workers list, they no longer exist. New + // workers will be created if necessary. + cap->spare_workers = NULL; + cap->n_spare_workers = 0; + cap->returning_tasks_hd = NULL; + cap->returning_tasks_tl = NULL; +#endif + + // Release all caps except 0, we'll use that for starting + // the IO manager and running the client action below. + if (cap->no != 0) { + task->cap = cap; + releaseCapability(cap); + } + } + cap = &capabilities[0]; + task->cap = cap; + + // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. for (g = 0; g < RtsFlags.GcFlags.generations; g++) { generations[g].threads = END_TSO_QUEUE; } - discardTasksExcept(cap->running_task); - -#if defined(THREADED_RTS) - // Wipe our spare workers list, they no longer exist. New - // workers will be created if necessary. - cap->spare_workers = NULL; - cap->n_spare_workers = 0; - cap->returning_tasks_hd = NULL; - cap->returning_tasks_tl = NULL; -#endif - // On Unix, all timers are reset in the child, so we need to start // the timer again. initTimer(); startTimer(); #if defined(THREADED_RTS) - cap = ioManagerStartCap(cap); + ioManagerStartCap(&cap); #endif - cap = rts_evalStableIO(cap, entry, NULL); // run the action + rts_evalStableIO(&cap, entry, NULL); // run the action rts_checkSchedStatus("forkProcess",cap); rts_unlock(cap); @@ -1928,11 +2021,14 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) #endif } -Capability * -scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) +void +scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap) { Task *task; DEBUG_ONLY( StgThreadID id ); + Capability *cap; + + cap = *pcap; // We already created/initialised the Task task = cap->running_task; @@ -1957,7 +2053,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id); - return cap; + *pcap = cap; } /* ---------------------------------------------------------------------------- diff --git a/rts/Stable.c b/rts/Stable.c index c46f8b2b9e..39b26173d8 100644 --- a/rts/Stable.c +++ b/rts/Stable.c @@ -77,7 +77,7 @@ static snEntry *stable_ptr_free = NULL; static unsigned int SPT_size = 0; #ifdef THREADED_RTS -static Mutex stable_mutex; +Mutex stable_mutex; #endif static void enlargeStablePtrTable(void); diff --git a/rts/Stable.h b/rts/Stable.h index d7b7f8bb1e..bec932af97 100644 --- a/rts/Stable.h +++ b/rts/Stable.h @@ -33,6 +33,11 @@ void updateStablePtrTable ( rtsBool full ); void stablePtrPreGC ( void ); void stablePtrPostGC ( void ); +#ifdef THREADED_RTS +// needed by Schedule.c:forkProcess() +extern Mutex stable_mutex; +#endif + #include "EndPrivate.h" #endif /* STABLE_H */ diff --git a/rts/posix/OSThreads.c b/rts/posix/OSThreads.c index cc3baeb1bb..e25cdc1e81 100644 --- a/rts/posix/OSThreads.c +++ b/rts/posix/OSThreads.c @@ -197,7 +197,7 @@ forkOS_createThreadWrapper ( void * entry ) { Capability *cap; cap = rts_lock(); - cap = rts_evalStableIO(cap, (HsStablePtr) entry, NULL); + rts_evalStableIO(&cap, (HsStablePtr) entry, NULL); taskTimeStamp(myTask()); rts_unlock(cap); return NULL; diff --git a/rts/posix/Signals.c b/rts/posix/Signals.c index 9f5bf9f370..38c9792552 100644 --- a/rts/posix/Signals.c +++ b/rts/posix/Signals.c @@ -145,11 +145,10 @@ ioManagerDie (void) } } -Capability * -ioManagerStartCap (Capability *cap) +void +ioManagerStartCap (Capability **cap) { - return rts_evalIO( - cap,&base_GHCziConcziIO_ensureIOManagerIsRunning_closure,NULL); + rts_evalIO(cap,&base_GHCziConcziIO_ensureIOManagerIsRunning_closure,NULL); } void @@ -159,7 +158,7 @@ ioManagerStart (void) Capability *cap; if (io_manager_control_fd < 0 || io_manager_wakeup_fd < 0) { cap = rts_lock(); - cap = ioManagerStartCap(cap); + ioManagerStartCap(&cap); rts_unlock(cap); } } diff --git a/rts/posix/Signals.h b/rts/posix/Signals.h index 7235559915..387d688912 100644 --- a/rts/posix/Signals.h +++ b/rts/posix/Signals.h @@ -24,7 +24,7 @@ extern siginfo_t *next_pending_handler; void startSignalHandlers(Capability *cap); #endif -Capability *ioManagerStartCap (Capability *cap); +void ioManagerStartCap (/* inout */ Capability **cap); extern StgInt *signal_handlers; diff --git a/rts/sm/GC.c b/rts/sm/GC.c index 88d5a02f89..a4ac1fb01d 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -259,7 +259,7 @@ GarbageCollect (rtsBool force_major_gc, * We don't try to parallelise minor GCs (unless the user asks for * it with +RTS -gn0), or mark/compact/sweep GC. */ - if (gc_type == PENDING_GC_PAR) { + if (gc_type == SYNC_GC_PAR) { n_gc_threads = RtsFlags.ParFlags.nNodes; } else { n_gc_threads = 1; |