diff options
author | Simon Marlow <simonmar@microsoft.com> | 2006-03-27 12:41:51 +0000 |
---|---|---|
committer | Simon Marlow <simonmar@microsoft.com> | 2006-03-27 12:41:51 +0000 |
commit | c520a3a2752ffcec5710a88a8a2e219c20edfc8a (patch) | |
tree | 6bdf5c64fd866885247ae89d04f80f4ead7a29ce | |
parent | 5ed93b107550cc10fda7ae187de65da1a4b24d87 (diff) | |
download | haskell-c520a3a2752ffcec5710a88a8a2e219c20edfc8a.tar.gz |
Add a new primitive forkOn#, for forking a thread on a specific Capability
This gives some control over affinity, while we figure out the best
way to automatically schedule threads to make best use of the
available parallelism.
In addition to the primitive, there is also:
GHC.Conc.forkOnIO :: Int -> IO () -> IO ThreadId
where 'forkOnIO i m' creates a thread on Capability (i `rem` N), where
N is the number of available Capabilities set by +RTS -N.
Threads forked by forkOnIO do not automatically migrate when there are
free Capabilities, like normal threads do. Still, if you're using
forkOnIO exclusively, it's a good idea to do +RTS -qm to disable work
pushing anyway (work pushing takes too much time when the run queues
are large, this is something we need to fix).
-rw-r--r-- | ghc/compiler/prelude/primops.txt.pp | 7 | ||||
-rw-r--r-- | ghc/includes/StgMiscClosures.h | 1 | ||||
-rw-r--r-- | ghc/includes/TSO.h | 8 | ||||
-rw-r--r-- | ghc/rts/PrimOps.cmm | 38 | ||||
-rw-r--r-- | ghc/rts/Schedule.c | 73 | ||||
-rw-r--r-- | ghc/rts/Schedule.h | 7 |
6 files changed, 110 insertions, 24 deletions
diff --git a/ghc/compiler/prelude/primops.txt.pp b/ghc/compiler/prelude/primops.txt.pp index ecde8821eb..13b4b6c97d 100644 --- a/ghc/compiler/prelude/primops.txt.pp +++ b/ghc/compiler/prelude/primops.txt.pp @@ -1441,6 +1441,13 @@ primop ForkOp "fork#" GenPrimOp has_side_effects = True out_of_line = True +primop ForkOnOp "forkOn#" GenPrimOp + Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #) + with + usage = { mangle ForkOnOp [mkO, mkP] mkR } + has_side_effects = True + out_of_line = True + primop KillThreadOp "killThread#" GenPrimOp ThreadId# -> a -> State# RealWorld -> State# RealWorld with diff --git a/ghc/includes/StgMiscClosures.h b/ghc/includes/StgMiscClosures.h index 62a7ed33de..4a6a7c47c2 100644 --- a/ghc/includes/StgMiscClosures.h +++ b/ghc/includes/StgMiscClosures.h @@ -579,6 +579,7 @@ RTS_FUN(makeStablePtrzh_fast); RTS_FUN(deRefStablePtrzh_fast); RTS_FUN(forkzh_fast); +RTS_FUN(forkOnzh_fast); RTS_FUN(yieldzh_fast); RTS_FUN(killThreadzh_fast); RTS_FUN(blockAsyncExceptionszh_fast); diff --git a/ghc/includes/TSO.h b/ghc/includes/TSO.h index 14c47abed4..d096d401cf 100644 --- a/ghc/includes/TSO.h +++ b/ghc/includes/TSO.h @@ -93,7 +93,13 @@ typedef StgWord32 StgThreadID; */ #define TSO_DIRTY 1 -#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY) +/* + * TSO_LOCKED is set when a TSO is locked to a particular Capability. + */ +#define TSO_LOCKED 2 + +#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY) +#define tsoLocked(tso) ((tso)->flags & TSO_LOCKED) /* * Type returned after running a thread. Values of this type diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm index 23bc22efd1..f1c214e304 100644 --- a/ghc/rts/PrimOps.cmm +++ b/ghc/rts/PrimOps.cmm @@ -876,19 +876,45 @@ decodeDoublezh_fast forkzh_fast { /* args: R1 = closure to spark */ - + MAYBE_GC(R1_PTR, forkzh_fast); - // create it right now, return ThreadID in R1 - "ptr" R1 = foreign "C" createIOThread( MyCapability() "ptr", + W_ closure; + W_ threadid; + closure = R1; + + "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr", + RtsFlags_GcFlags_initialStkSize(RtsFlags), + closure "ptr") []; + foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") []; + + // switch at the earliest opportunity + CInt[context_switch] = 1 :: CInt; + + RET_P(threadid); +} + +forkOnzh_fast +{ + /* args: R1 = cpu, R2 = closure to spark */ + + MAYBE_GC(R2_PTR, forkOnzh_fast); + + W_ cpu; + W_ closure; + W_ threadid; + cpu = R1; + closure = R2; + + "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr", RtsFlags_GcFlags_initialStkSize(RtsFlags), - R1 "ptr") [R1]; - foreign "C" scheduleThread(MyCapability() "ptr", R1 "ptr") [R1]; + closure "ptr") []; + foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") []; // switch at the earliest opportunity CInt[context_switch] = 1 :: CInt; - RET_P(R1); + RET_P(threadid); } yieldzh_fast diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index d49d4ed8e5..52fd4d5df6 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -204,6 +204,7 @@ static void schedulePushWork(Capability *cap, Task *task); #endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); +static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); #if defined(GRAN) @@ -482,20 +483,7 @@ schedule (Capability *initialCapability, Task *task) // list each time around the scheduler. if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - // Any threads that were woken up by other Capabilities get - // appended to our run queue. - if (!emptyWakeupQueue(cap)) { - ACQUIRE_LOCK(&cap->lock); - if (emptyRunQueue(cap)) { - cap->run_queue_hd = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } else { - cap->run_queue_tl->link = cap->wakeup_queue_hd; - cap->run_queue_tl = cap->wakeup_queue_tl; - } - cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; - RELEASE_LOCK(&cap->lock); - } + scheduleCheckWakeupThreads(cap); scheduleCheckBlockedThreads(cap); @@ -841,7 +829,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS, next = t->link; t->link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated - || t->bound == task) { // don't move my bound thread + || t->bound == task // don't move my bound thread + || tsoLocked(t)) { // don't move a locked thread prev->link = t; prev = t; } else if (i == n_free_caps) { @@ -928,6 +917,31 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS) /* ---------------------------------------------------------------------------- + * Check for threads woken up by other Capabilities + * ------------------------------------------------------------------------- */ + +static void +scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS) +{ +#if defined(THREADED_RTS) + // Any threads that were woken up by other Capabilities get + // appended to our run queue. + if (!emptyWakeupQueue(cap)) { + ACQUIRE_LOCK(&cap->lock); + if (emptyRunQueue(cap)) { + cap->run_queue_hd = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } else { + cap->run_queue_tl->link = cap->wakeup_queue_hd; + cap->run_queue_tl = cap->wakeup_queue_tl; + } + cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE; + RELEASE_LOCK(&cap->lock); + } +#endif +} + +/* ---------------------------------------------------------------------------- * Check for threads blocked on BLACKHOLEs that can be woken up * ------------------------------------------------------------------------- */ static void @@ -2709,6 +2723,28 @@ scheduleThread(Capability *cap, StgTSO *tso) appendToRunQueue(cap,tso); } +void +scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso) +{ +#if defined(THREADED_RTS) + tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't + // move this thread from now on. + cpu %= RtsFlags.ParFlags.nNodes; + if (cpu == cap->no) { + appendToRunQueue(cap,tso); + } else { + Capability *target_cap = &capabilities[cpu]; + if (tso->bound) { + tso->bound->cap = target_cap; + } + tso->cap = target_cap; + wakeupThreadOnCapability(target_cap,tso); + } +#else + appendToRunQueue(cap,tso); +#endif +} + Capability * scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) { @@ -3244,7 +3280,8 @@ unblockOne(Capability *cap, StgTSO *tso) next = tso->link; tso->link = END_TSO_QUEUE; - if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) { +#if defined(THREADED_RTS) + if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) { // We are waking up this thread on the current Capability, which // might involve migrating it from the Capability it was last on. if (tso->bound) { @@ -3260,6 +3297,10 @@ unblockOne(Capability *cap, StgTSO *tso) // we'll try to wake it up on the Capability it was last on. wakeupThreadOnCapability(tso->cap, tso); } +#else + appendToRunQueue(cap,tso); + context_switch = 1; +#endif IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no)); return next; diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index d11162e47d..37b07941f4 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -20,9 +20,14 @@ void initScheduler (void); void exitScheduler (void); -// Place a new thread on the run queue of the specified Capability +// Place a new thread on the run queue of the current Capability void scheduleThread (Capability *cap, StgTSO *tso); +// Place a new thread on the run queue of a specified Capability +// (cap is the currently owned Capability, cpu is the number of +// the desired Capability). +void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso); + /* awakenBlockedQueue() * * Takes a pointer to the beginning of a blocked TSO queue, and |