summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2011-12-06 11:38:07 +0000
committerSimon Marlow <marlowsd@gmail.com>2011-12-06 15:19:18 +0000
commit8b75acd3ca25165536f18976c8d80cb62ad613e4 (patch)
treeccb87f6f5df2af15ca2ca8f65e5163b1f34886b8
parent657773c8e59917fda05ee08065ec566aebb50a5f (diff)
downloadhaskell-8b75acd3ca25165536f18976c8d80cb62ad613e4.tar.gz
Make forkProcess work with +RTS -N
Consider this experimental for the time being. There are a lot of things that could go wrong, but I've verified that at least it works on the test cases we have. I also did some API cleanups while I was here. Previously we had: Capability * rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret); but this API is particularly error-prone: if you forget to discard the Capability * you passed in and use the return value instead, then you're in for subtle bugs with +RTS -N later on. So I changed all these functions to this form: void rts_eval (/* inout */ Capability **cap, /* in */ HaskellObj p, /* out */ HaskellObj *ret) It's much harder to use this version incorrectly, because you have to pass the Capability in by reference.
-rw-r--r--compiler/deSugar/DsForeign.lhs4
-rw-r--r--includes/RtsAPI.h48
-rw-r--r--includes/rts/Threads.h5
-rw-r--r--rts/Capability.c17
-rw-r--r--rts/Capability.h13
-rw-r--r--rts/RtsAPI.c64
-rw-r--r--rts/RtsMain.c2
-rw-r--r--rts/RtsStartup.c2
-rw-r--r--rts/Schedule.c268
-rw-r--r--rts/Stable.c2
-rw-r--r--rts/Stable.h5
-rw-r--r--rts/posix/OSThreads.c2
-rw-r--r--rts/posix/Signals.c9
-rw-r--r--rts/posix/Signals.h2
-rw-r--r--rts/sm/GC.c2
15 files changed, 285 insertions, 160 deletions
diff --git a/compiler/deSugar/DsForeign.lhs b/compiler/deSugar/DsForeign.lhs
index 6f9bbc2ef8..30d4af9804 100644
--- a/compiler/deSugar/DsForeign.lhs
+++ b/compiler/deSugar/DsForeign.lhs
@@ -609,8 +609,8 @@ mkFExportCBits dflags c_nm maybe_target arg_htys res_hty is_IO_res_ty cc
, declareCResult
, text "cap = rts_lock();"
-- create the application + perform it.
- , ptext (sLit "cap=rts_evalIO") <> parens (
- cap <>
+ , ptext (sLit "rts_evalIO") <> parens (
+ char '&' <> cap <>
ptext (sLit "rts_apply") <> parens (
cap <>
text "(HaskellObj)"
diff --git a/includes/RtsAPI.h b/includes/RtsAPI.h
index 8d948f9b49..e3b3f7d5f5 100644
--- a/includes/RtsAPI.h
+++ b/includes/RtsAPI.h
@@ -181,32 +181,44 @@ HsBool rts_getBool ( HaskellObj );
The versions ending in '_' allow you to specify an initial stack size.
Note that these calls may cause Garbage Collection, so all HaskellObj
references are rendered invalid by these calls.
+
+ All of these functions take a (Capability **) - there is a
+ Capability pointer both input and output. We use an inout
+ parameter because this is less error-prone for the client than a
+ return value - the client could easily forget to use the return
+ value, whereas incorrectly using an inout parameter will usually
+ result in a type error.
------------------------------------------------------------------------- */
-Capability *
-rts_eval (Capability *, HaskellObj p, /*out*/HaskellObj *ret);
-Capability *
-rts_eval_ (Capability *, HaskellObj p, unsigned int stack_size,
- /*out*/HaskellObj *ret);
+void rts_eval (/* inout */ Capability **,
+ /* in */ HaskellObj p,
+ /* out */ HaskellObj *ret);
+
+void rts_eval_ (/* inout */ Capability **,
+ /* in */ HaskellObj p,
+ /* in */ unsigned int stack_size,
+ /* out */ HaskellObj *ret);
-Capability *
-rts_evalIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret);
+void rts_evalIO (/* inout */ Capability **,
+ /* in */ HaskellObj p,
+ /* out */ HaskellObj *ret);
-Capability *
-rts_evalStableIO (Capability *, HsStablePtr s, /*out*/HsStablePtr *ret);
+void rts_evalStableIO (/* inout */ Capability **,
+ /* in */ HsStablePtr s,
+ /* out */ HsStablePtr *ret);
-Capability *
-rts_evalLazyIO (Capability *, HaskellObj p, /*out*/HaskellObj *ret);
+void rts_evalLazyIO (/* inout */ Capability **,
+ /* in */ HaskellObj p,
+ /* out */ HaskellObj *ret);
-Capability *
-rts_evalLazyIO_ (Capability *, HaskellObj p, unsigned int stack_size,
- /*out*/HaskellObj *ret);
+void rts_evalLazyIO_ (/* inout */ Capability **,
+ /* in */ HaskellObj p,
+ /* in */ unsigned int stack_size,
+ /* out */ HaskellObj *ret);
-void
-rts_checkSchedStatus (char* site, Capability *);
+void rts_checkSchedStatus (char* site, Capability *);
-SchedulerStatus
-rts_getSchedStatus (Capability *cap);
+SchedulerStatus rts_getSchedStatus (Capability *cap);
/* --------------------------------------------------------------------------
Wrapper closures
diff --git a/includes/rts/Threads.h b/includes/rts/Threads.h
index c974142ce3..d2c4aff984 100644
--- a/includes/rts/Threads.h
+++ b/includes/rts/Threads.h
@@ -20,8 +20,9 @@
//
StgTSO *createThread (Capability *cap, nat stack_size);
-Capability *scheduleWaitThread (StgTSO *tso, /*out*/HaskellObj* ret,
- Capability *cap);
+void scheduleWaitThread (/* in */ StgTSO *tso,
+ /* out */ HaskellObj* ret,
+ /* inout */ Capability **cap);
StgTSO *createGenThread (Capability *cap, nat stack_size,
StgClosure *closure);
diff --git a/rts/Capability.c b/rts/Capability.c
index 26e420970b..4d23f71a86 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -40,8 +40,12 @@ Capability *capabilities = NULL;
// locking, so we don't do that.
Capability *last_free_capability = NULL;
-/* GC indicator, in scope for the scheduler, init'ed to false */
-volatile StgWord waiting_for_gc = 0;
+/*
+ * Indicates that the RTS wants to synchronise all the Capabilities
+ * for some reason. All Capabilities should stop and return to the
+ * scheduler.
+ */
+volatile StgWord pending_sync = 0;
/* Let foreign code get the current Capability -- assuming there is one!
* This is useful for unsafe foreign calls because they are called with
@@ -422,13 +426,12 @@ releaseCapability_ (Capability* cap,
return;
}
- if (waiting_for_gc == PENDING_GC_SEQ) {
+ if (pending_sync == SYNC_GC_SEQ || pending_sync == SYNC_FORK) {
last_free_capability = cap; // needed?
- debugTrace(DEBUG_sched, "GC pending, set capability %d free", cap->no);
+ debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
return;
}
-
// If the next thread on the run queue is a bound thread,
// give this Capability to the appropriate Task.
if (!emptyRunQueue(cap) && cap->run_queue_hd->bound) {
@@ -536,7 +539,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
#endif
/* ----------------------------------------------------------------------------
- * waitForReturnCapability( Task *task )
+ * waitForReturnCapability (Capability **pCap, Task *task)
*
* Purpose: when an OS thread returns from an external call,
* it calls waitForReturnCapability() (via Schedule.resumeThread())
@@ -643,7 +646,7 @@ yieldCapability (Capability** pCap, Task *task)
{
Capability *cap = *pCap;
- if (waiting_for_gc == PENDING_GC_PAR) {
+ if (pending_sync == SYNC_GC_PAR) {
traceEventGcStart(cap);
gcWorkerThread(cap);
traceEventGcEnd(cap);
diff --git a/rts/Capability.h b/rts/Capability.h
index 1957487329..033806b3be 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -199,10 +199,15 @@ extern Capability *capabilities;
//
extern Capability *last_free_capability;
-// GC indicator, in scope for the scheduler
-#define PENDING_GC_SEQ 1
-#define PENDING_GC_PAR 2
-extern volatile StgWord waiting_for_gc;
+//
+// Indicates that the RTS wants to synchronise all the Capabilities
+// for some reason. All Capabilities should stop and return to the
+// scheduler.
+//
+#define SYNC_GC_SEQ 1
+#define SYNC_GC_PAR 2
+#define SYNC_FORK 3
+extern volatile StgWord pending_sync;
// Acquires a capability at a return point. If *cap is non-NULL, then
// this is taken as a preference for the Capability we wish to
diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c
index 8fcf8ce812..0463f15ad8 100644
--- a/rts/RtsAPI.c
+++ b/rts/RtsAPI.c
@@ -421,36 +421,39 @@ createStrictIOThread(Capability *cap, nat stack_size, StgClosure *closure)
Evaluating Haskell expressions
------------------------------------------------------------------------- */
-Capability *
-rts_eval (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
+void rts_eval (/* inout */ Capability **cap,
+ /* in */ HaskellObj p,
+ /* out */ HaskellObj *ret)
{
StgTSO *tso;
- tso = createGenThread(cap, RtsFlags.GcFlags.initialStkSize, p);
- return scheduleWaitThread(tso,ret,cap);
+ tso = createGenThread(*cap, RtsFlags.GcFlags.initialStkSize, p);
+ scheduleWaitThread(tso,ret,cap);
}
-Capability *
-rts_eval_ (Capability *cap, HaskellObj p, unsigned int stack_size,
- /*out*/HaskellObj *ret)
+void rts_eval_ (/* inout */ Capability **cap,
+ /* in */ HaskellObj p,
+ /* in */ unsigned int stack_size,
+ /* out */ HaskellObj *ret)
{
StgTSO *tso;
- tso = createGenThread(cap, stack_size, p);
- return scheduleWaitThread(tso,ret,cap);
+ tso = createGenThread(*cap, stack_size, p);
+ scheduleWaitThread(tso,ret,cap);
}
/*
* rts_evalIO() evaluates a value of the form (IO a), forcing the action's
* result to WHNF before returning.
*/
-Capability *
-rts_evalIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
+void rts_evalIO (/* inout */ Capability **cap,
+ /* in */ HaskellObj p,
+ /* out */ HaskellObj *ret)
{
StgTSO* tso;
- tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p);
- return scheduleWaitThread(tso,ret,cap);
+ tso = createStrictIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p);
+ scheduleWaitThread(tso,ret,cap);
}
/*
@@ -459,49 +462,50 @@ rts_evalIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
* action's result to WHNF before returning. The result is returned
* in a StablePtr.
*/
-Capability *
-rts_evalStableIO (Capability *cap, HsStablePtr s, /*out*/HsStablePtr *ret)
+void rts_evalStableIO (/* inout */ Capability **cap,
+ /* in */ HsStablePtr s,
+ /* out */ HsStablePtr *ret)
{
StgTSO* tso;
StgClosure *p, *r;
SchedulerStatus stat;
-
+
p = (StgClosure *)deRefStablePtr(s);
- tso = createStrictIOThread(cap, RtsFlags.GcFlags.initialStkSize, p);
+ tso = createStrictIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p);
// async exceptions are always blocked by default in the created
// thread. See #1048.
tso->flags |= TSO_BLOCKEX | TSO_INTERRUPTIBLE;
- cap = scheduleWaitThread(tso,&r,cap);
- stat = rts_getSchedStatus(cap);
+ scheduleWaitThread(tso,&r,cap);
+ stat = rts_getSchedStatus(*cap);
if (stat == Success && ret != NULL) {
ASSERT(r != NULL);
*ret = getStablePtr((StgPtr)r);
}
-
- return cap;
}
/*
* Like rts_evalIO(), but doesn't force the action's result.
*/
-Capability *
-rts_evalLazyIO (Capability *cap, HaskellObj p, /*out*/HaskellObj *ret)
+void rts_evalLazyIO (/* inout */ Capability **cap,
+ /* in */ HaskellObj p,
+ /* out */ HaskellObj *ret)
{
StgTSO *tso;
- tso = createIOThread(cap, RtsFlags.GcFlags.initialStkSize, p);
- return scheduleWaitThread(tso,ret,cap);
+ tso = createIOThread(*cap, RtsFlags.GcFlags.initialStkSize, p);
+ scheduleWaitThread(tso,ret,cap);
}
-Capability *
-rts_evalLazyIO_ (Capability *cap, HaskellObj p, unsigned int stack_size,
- /*out*/HaskellObj *ret)
+void rts_evalLazyIO_ (/* inout */ Capability **cap,
+ /* in */ HaskellObj p,
+ /* in */ unsigned int stack_size,
+ /* out */ HaskellObj *ret)
{
StgTSO *tso;
- tso = createIOThread(cap, stack_size, p);
- return scheduleWaitThread(tso,ret,cap);
+ tso = createIOThread(*cap, stack_size, p);
+ scheduleWaitThread(tso,ret,cap);
}
/* Convenience function for decoding the returned status. */
diff --git a/rts/RtsMain.c b/rts/RtsMain.c
index 0f6ca82382..2084435f16 100644
--- a/rts/RtsMain.c
+++ b/rts/RtsMain.c
@@ -60,7 +60,7 @@ static void real_main(void)
/* ToDo: want to start with a larger stack size */
{
Capability *cap = rts_lock();
- cap = rts_evalLazyIO(cap,progmain_closure, NULL);
+ rts_evalLazyIO(&cap,progmain_closure, NULL);
status = rts_getSchedStatus(cap);
taskTimeStamp(myTask());
rts_unlock(cap);
diff --git a/rts/RtsStartup.c b/rts/RtsStartup.c
index c451292012..c63f85ee82 100644
--- a/rts/RtsStartup.c
+++ b/rts/RtsStartup.c
@@ -431,7 +431,7 @@ static void flushStdHandles(void)
{
Capability *cap;
cap = rts_lock();
- cap = rts_evalIO(cap, flushStdHandles_closure, NULL);
+ rts_evalIO(&cap, flushStdHandles_closure, NULL);
rts_unlock(cap);
}
diff --git a/rts/Schedule.c b/rts/Schedule.c
index cd704d2871..70f6a3fc00 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -40,6 +40,7 @@
#include "Timer.h"
#include "ThreadPaused.h"
#include "Messages.h"
+#include "Stable.h"
#ifdef HAVE_SYS_TYPES_H
#include <sys/types.h>
@@ -130,6 +131,10 @@ static void scheduleFindWork (Capability *cap);
#if defined(THREADED_RTS)
static void scheduleYield (Capability **pcap, Task *task);
#endif
+#if defined(THREADED_RTS)
+static nat requestSync (Capability **pcap, Task *task, nat sync_type);
+static void acquireAllCapabilities(Capability *cap, Task *task);
+#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleProcessInbox(Capability *cap);
@@ -617,7 +622,7 @@ shouldYieldCapability (Capability *cap, Task *task)
// - the thread at the head of the run queue cannot be run
// by this Task (it is bound to another Task, or it is unbound
// and this task it bound).
- return (waiting_for_gc ||
+ return (pending_sync ||
cap->returning_tasks_hd != NULL ||
(!emptyRunQueue(cap) && (task->incall->tso == NULL
? cap->run_queue_hd->bound != NULL
@@ -1319,6 +1324,72 @@ scheduleNeedHeapProfile( rtsBool ready_to_gc STG_UNUSED )
}
/* -----------------------------------------------------------------------------
+ * Start a synchronisation of all capabilities
+ * -------------------------------------------------------------------------- */
+
+// Returns:
+// 0 if we successfully got a sync
+// non-0 if there was another sync request in progress,
+// and we yielded to it. The value returned is the
+// type of the other sync request.
+//
+#if defined(THREADED_RTS)
+static nat requestSync (Capability **pcap, Task *task, nat sync_type)
+{
+ nat prev_pending_sync;
+
+ prev_pending_sync = cas(&pending_sync, 0, sync_type);
+
+ if (prev_pending_sync)
+ {
+ do {
+ debugTrace(DEBUG_sched, "someone else is trying to sync (%d)...",
+ prev_pending_sync);
+ ASSERT(*pcap);
+ yieldCapability(pcap,task);
+ } while (pending_sync);
+ return prev_pending_sync; // NOTE: task->cap might have changed now
+ }
+ else
+ {
+ return 0;
+ }
+}
+
+//
+// Grab all the capabilities except the one we already hold. Used
+// when synchronising before a single-threaded GC (SYNC_SEQ_GC), and
+// before a fork (SYNC_FORK).
+//
+// Only call this after requestSync(), otherwise a deadlock might
+// ensue if another thread is trying to synchronise.
+//
+static void acquireAllCapabilities(Capability *cap, Task *task)
+{
+ Capability *tmpcap;
+ nat i;
+
+ for (i=0; i < n_capabilities; i++) {
+ debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities);
+ tmpcap = &capabilities[i];
+ if (tmpcap != cap) {
+ // we better hope this task doesn't get migrated to
+ // another Capability while we're waiting for this one.
+ // It won't, because load balancing happens while we have
+ // all the Capabilities, but even so it's a slightly
+ // unsavoury invariant.
+ task->cap = tmpcap;
+ waitForReturnCapability(&tmpcap, task);
+ if (tmpcap != &capabilities[i]) {
+ barf("acquireAllCapabilities: got the wrong capability");
+ }
+ }
+ }
+}
+
+#endif
+
+/* -----------------------------------------------------------------------------
* Perform a garbage collection if necessary
* -------------------------------------------------------------------------- */
@@ -1327,10 +1398,8 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
{
rtsBool heap_census;
#ifdef THREADED_RTS
- /* extern static volatile StgWord waiting_for_gc;
- lives inside capability.c */
- rtsBool gc_type, prev_pending_gc;
- nat i;
+ rtsBool gc_type;
+ nat i, sync;
#endif
if (sched_state == SCHED_SHUTTING_DOWN) {
@@ -1346,9 +1415,9 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
&& N >= RtsFlags.ParFlags.parGcGen
&& ! oldest_gen->mark)
{
- gc_type = PENDING_GC_PAR;
+ gc_type = SYNC_GC_PAR;
} else {
- gc_type = PENDING_GC_SEQ;
+ gc_type = SYNC_GC_SEQ;
}
// In order to GC, there must be no threads running Haskell code.
@@ -1363,26 +1432,25 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
//
/* Other capabilities are prevented from running yet more Haskell
- threads if waiting_for_gc is set. Tested inside
+ threads if pending_sync is set. Tested inside
yieldCapability() and releaseCapability() in Capability.c */
- prev_pending_gc = cas(&waiting_for_gc, 0, gc_type);
- if (prev_pending_gc) {
- do {
- debugTrace(DEBUG_sched, "someone else is trying to GC (%d)...",
- prev_pending_gc);
- ASSERT(cap);
- yieldCapability(&cap,task);
- } while (waiting_for_gc);
- return cap; // NOTE: task->cap might have changed here
- }
+ do {
+ sync = requestSync(&cap, task, gc_type);
+ if (sync == SYNC_GC_SEQ || sync == SYNC_GC_PAR) {
+ // someone else had a pending sync request for a GC, so
+ // let's assume GC has been done and we don't need to GC
+ // again.
+ return cap;
+ }
+ } while (sync);
interruptAllCapabilities();
// The final shutdown GC is always single-threaded, because it's
// possible that some of the Capabilities have no worker threads.
- if (gc_type == PENDING_GC_SEQ)
+ if (gc_type == SYNC_GC_SEQ)
{
traceEventRequestSeqGc(cap);
}
@@ -1392,25 +1460,10 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
debugTrace(DEBUG_sched, "ready_to_gc, grabbing GC threads");
}
- if (gc_type == PENDING_GC_SEQ)
+ if (gc_type == SYNC_GC_SEQ)
{
// single-threaded GC: grab all the capabilities
- for (i=0; i < n_capabilities; i++) {
- debugTrace(DEBUG_sched, "ready_to_gc, grabbing all the capabilies (%d/%d)", i, n_capabilities);
- if (cap != &capabilities[i]) {
- Capability *pcap = &capabilities[i];
- // we better hope this task doesn't get migrated to
- // another Capability while we're waiting for this one.
- // It won't, because load balancing happens while we have
- // all the Capabilities, but even so it's a slightly
- // unsavoury invariant.
- task->cap = pcap;
- waitForReturnCapability(&pcap, task);
- if (pcap != &capabilities[i]) {
- barf("scheduleDoGC: got the wrong capability");
- }
- }
- }
+ acquireAllCapabilities(cap,task);
}
else
{
@@ -1455,9 +1508,9 @@ delete_threads_and_gc:
traceEventGcStart(cap);
#if defined(THREADED_RTS)
- // reset waiting_for_gc *before* GC, so that when the GC threads
+ // reset pending_sync *before* GC, so that when the GC threads
// emerge they don't immediately re-enter the GC.
- waiting_for_gc = 0;
+ pending_sync = 0;
GarbageCollect(force_major || heap_census, heap_census, gc_type, cap);
#else
GarbageCollect(force_major || heap_census, heap_census, 0, cap);
@@ -1494,7 +1547,7 @@ delete_threads_and_gc:
}
#if defined(THREADED_RTS)
- if (gc_type == PENDING_GC_PAR)
+ if (gc_type == SYNC_GC_PAR)
{
releaseGCThreads(cap);
}
@@ -1526,7 +1579,7 @@ delete_threads_and_gc:
#endif
#if defined(THREADED_RTS)
- if (gc_type == PENDING_GC_SEQ) {
+ if (gc_type == SYNC_GC_SEQ) {
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
if (cap != &capabilities[i]) {
@@ -1561,26 +1614,41 @@ forkProcess(HsStablePtr *entry
StgTSO* t,*next;
Capability *cap;
nat g;
-
-#if defined(THREADED_RTS)
- if (RtsFlags.ParFlags.nNodes > 1) {
- errorBelch("forking not supported with +RTS -N<n> greater than 1");
- stg_exit(EXIT_FAILURE);
- }
+ Task *task = NULL;
+ nat i;
+#ifdef THREADED_RTS
+ nat sync;
#endif
debugTrace(DEBUG_sched, "forking!");
- // ToDo: for SMP, we should probably acquire *all* the capabilities
- cap = rts_lock();
-
+ task = newBoundTask();
+
+ cap = NULL;
+ waitForReturnCapability(&cap, task);
+
+#ifdef THREADED_RTS
+ do {
+ sync = requestSync(&cap, task, SYNC_FORK);
+ } while (sync);
+
+ acquireAllCapabilities(cap,task);
+
+ pending_sync = 0;
+#endif
+
// no funny business: hold locks while we fork, otherwise if some
// other thread is holding a lock when the fork happens, the data
// structure protected by the lock will forever be in an
// inconsistent state in the child. See also #1391.
ACQUIRE_LOCK(&sched_mutex);
- ACQUIRE_LOCK(&cap->lock);
- ACQUIRE_LOCK(&cap->running_task->lock);
+ ACQUIRE_LOCK(&sm_mutex);
+ ACQUIRE_LOCK(&stable_mutex);
+ ACQUIRE_LOCK(&task->lock);
+
+ for (i=0; i < n_capabilities; i++) {
+ ACQUIRE_LOCK(&capabilities[i].lock);
+ }
stopTimer(); // See #4074
@@ -1595,19 +1663,30 @@ forkProcess(HsStablePtr *entry
startTimer(); // #4074
RELEASE_LOCK(&sched_mutex);
- RELEASE_LOCK(&cap->lock);
- RELEASE_LOCK(&cap->running_task->lock);
+ RELEASE_LOCK(&sm_mutex);
+ RELEASE_LOCK(&stable_mutex);
+ RELEASE_LOCK(&task->lock);
+
+ for (i=0; i < n_capabilities; i++) {
+ releaseCapability_(&capabilities[i],rtsFalse);
+ RELEASE_LOCK(&capabilities[i].lock);
+ }
+ boundTaskExiting(task);
// just return the pid
- rts_unlock(cap);
- return pid;
+ return pid;
} else { // child
#if defined(THREADED_RTS)
initMutex(&sched_mutex);
- initMutex(&cap->lock);
- initMutex(&cap->running_task->lock);
+ initMutex(&sm_mutex);
+ initMutex(&stable_mutex);
+ initMutex(&task->lock);
+
+ for (i=0; i < n_capabilities; i++) {
+ initMutex(&capabilities[i].lock);
+ }
#endif
#ifdef TRACING
@@ -1626,7 +1705,7 @@ forkProcess(HsStablePtr *entry
// don't allow threads to catch the ThreadKilled
// exception, but we do want to raiseAsync() because these
// threads may be evaluating thunks that we need later.
- deleteThread_(cap,t);
+ deleteThread_(t->cap,t);
// stop the GC from updating the InCall to point to
// the TSO. This is only necessary because the
@@ -1637,44 +1716,58 @@ forkProcess(HsStablePtr *entry
}
}
- // Empty the run queue. It seems tempting to let all the
- // killed threads stay on the run queue as zombies to be
- // cleaned up later, but some of them correspond to bound
- // threads for which the corresponding Task does not exist.
- cap->run_queue_hd = END_TSO_QUEUE;
- cap->run_queue_tl = END_TSO_QUEUE;
-
- // Any suspended C-calling Tasks are no more, their OS threads
- // don't exist now:
- cap->suspended_ccalls = NULL;
-
- // Empty the threads lists. Otherwise, the garbage
+ discardTasksExcept(task);
+
+ for (i=0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+
+ // Empty the run queue. It seems tempting to let all the
+ // killed threads stay on the run queue as zombies to be
+ // cleaned up later, but some of them may correspond to
+ // bound threads for which the corresponding Task does not
+ // exist.
+ cap->run_queue_hd = END_TSO_QUEUE;
+ cap->run_queue_tl = END_TSO_QUEUE;
+
+ // Any suspended C-calling Tasks are no more, their OS threads
+ // don't exist now:
+ cap->suspended_ccalls = NULL;
+
+#if defined(THREADED_RTS)
+ // Wipe our spare workers list, they no longer exist. New
+ // workers will be created if necessary.
+ cap->spare_workers = NULL;
+ cap->n_spare_workers = 0;
+ cap->returning_tasks_hd = NULL;
+ cap->returning_tasks_tl = NULL;
+#endif
+
+ // Release all caps except 0, we'll use that for starting
+ // the IO manager and running the client action below.
+ if (cap->no != 0) {
+ task->cap = cap;
+ releaseCapability(cap);
+ }
+ }
+ cap = &capabilities[0];
+ task->cap = cap;
+
+ // Empty the threads lists. Otherwise, the garbage
// collector may attempt to resurrect some of these threads.
for (g = 0; g < RtsFlags.GcFlags.generations; g++) {
generations[g].threads = END_TSO_QUEUE;
}
- discardTasksExcept(cap->running_task);
-
-#if defined(THREADED_RTS)
- // Wipe our spare workers list, they no longer exist. New
- // workers will be created if necessary.
- cap->spare_workers = NULL;
- cap->n_spare_workers = 0;
- cap->returning_tasks_hd = NULL;
- cap->returning_tasks_tl = NULL;
-#endif
-
// On Unix, all timers are reset in the child, so we need to start
// the timer again.
initTimer();
startTimer();
#if defined(THREADED_RTS)
- cap = ioManagerStartCap(cap);
+ ioManagerStartCap(&cap);
#endif
- cap = rts_evalStableIO(cap, entry, NULL); // run the action
+ rts_evalStableIO(&cap, entry, NULL); // run the action
rts_checkSchedStatus("forkProcess",cap);
rts_unlock(cap);
@@ -1928,11 +2021,14 @@ scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
#endif
}
-Capability *
-scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
+void
+scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
{
Task *task;
DEBUG_ONLY( StgThreadID id );
+ Capability *cap;
+
+ cap = *pcap;
// We already created/initialised the Task
task = cap->running_task;
@@ -1957,7 +2053,7 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
debugTrace(DEBUG_sched, "bound thread (%lu) finished", (unsigned long)id);
- return cap;
+ *pcap = cap;
}
/* ----------------------------------------------------------------------------
diff --git a/rts/Stable.c b/rts/Stable.c
index c46f8b2b9e..39b26173d8 100644
--- a/rts/Stable.c
+++ b/rts/Stable.c
@@ -77,7 +77,7 @@ static snEntry *stable_ptr_free = NULL;
static unsigned int SPT_size = 0;
#ifdef THREADED_RTS
-static Mutex stable_mutex;
+Mutex stable_mutex;
#endif
static void enlargeStablePtrTable(void);
diff --git a/rts/Stable.h b/rts/Stable.h
index d7b7f8bb1e..bec932af97 100644
--- a/rts/Stable.h
+++ b/rts/Stable.h
@@ -33,6 +33,11 @@ void updateStablePtrTable ( rtsBool full );
void stablePtrPreGC ( void );
void stablePtrPostGC ( void );
+#ifdef THREADED_RTS
+// needed by Schedule.c:forkProcess()
+extern Mutex stable_mutex;
+#endif
+
#include "EndPrivate.h"
#endif /* STABLE_H */
diff --git a/rts/posix/OSThreads.c b/rts/posix/OSThreads.c
index cc3baeb1bb..e25cdc1e81 100644
--- a/rts/posix/OSThreads.c
+++ b/rts/posix/OSThreads.c
@@ -197,7 +197,7 @@ forkOS_createThreadWrapper ( void * entry )
{
Capability *cap;
cap = rts_lock();
- cap = rts_evalStableIO(cap, (HsStablePtr) entry, NULL);
+ rts_evalStableIO(&cap, (HsStablePtr) entry, NULL);
taskTimeStamp(myTask());
rts_unlock(cap);
return NULL;
diff --git a/rts/posix/Signals.c b/rts/posix/Signals.c
index 9f5bf9f370..38c9792552 100644
--- a/rts/posix/Signals.c
+++ b/rts/posix/Signals.c
@@ -145,11 +145,10 @@ ioManagerDie (void)
}
}
-Capability *
-ioManagerStartCap (Capability *cap)
+void
+ioManagerStartCap (Capability **cap)
{
- return rts_evalIO(
- cap,&base_GHCziConcziIO_ensureIOManagerIsRunning_closure,NULL);
+ rts_evalIO(cap,&base_GHCziConcziIO_ensureIOManagerIsRunning_closure,NULL);
}
void
@@ -159,7 +158,7 @@ ioManagerStart (void)
Capability *cap;
if (io_manager_control_fd < 0 || io_manager_wakeup_fd < 0) {
cap = rts_lock();
- cap = ioManagerStartCap(cap);
+ ioManagerStartCap(&cap);
rts_unlock(cap);
}
}
diff --git a/rts/posix/Signals.h b/rts/posix/Signals.h
index 7235559915..387d688912 100644
--- a/rts/posix/Signals.h
+++ b/rts/posix/Signals.h
@@ -24,7 +24,7 @@ extern siginfo_t *next_pending_handler;
void startSignalHandlers(Capability *cap);
#endif
-Capability *ioManagerStartCap (Capability *cap);
+void ioManagerStartCap (/* inout */ Capability **cap);
extern StgInt *signal_handlers;
diff --git a/rts/sm/GC.c b/rts/sm/GC.c
index 88d5a02f89..a4ac1fb01d 100644
--- a/rts/sm/GC.c
+++ b/rts/sm/GC.c
@@ -259,7 +259,7 @@ GarbageCollect (rtsBool force_major_gc,
* We don't try to parallelise minor GCs (unless the user asks for
* it with +RTS -gn0), or mark/compact/sweep GC.
*/
- if (gc_type == PENDING_GC_PAR) {
+ if (gc_type == SYNC_GC_PAR) {
n_gc_threads = RtsFlags.ParFlags.nNodes;
} else {
n_gc_threads = 1;