summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
authorSimon Marlow <simonmar@microsoft.com>2006-03-27 12:41:51 +0000
committerSimon Marlow <simonmar@microsoft.com>2006-03-27 12:41:51 +0000
commitc520a3a2752ffcec5710a88a8a2e219c20edfc8a (patch)
tree6bdf5c64fd866885247ae89d04f80f4ead7a29ce
parent5ed93b107550cc10fda7ae187de65da1a4b24d87 (diff)
downloadhaskell-c520a3a2752ffcec5710a88a8a2e219c20edfc8a.tar.gz
Add a new primitive forkOn#, for forking a thread on a specific Capability
This gives some control over affinity, while we figure out the best way to automatically schedule threads to make best use of the available parallelism. In addition to the primitive, there is also: GHC.Conc.forkOnIO :: Int -> IO () -> IO ThreadId where 'forkOnIO i m' creates a thread on Capability (i `rem` N), where N is the number of available Capabilities set by +RTS -N. Threads forked by forkOnIO do not automatically migrate when there are free Capabilities, like normal threads do. Still, if you're using forkOnIO exclusively, it's a good idea to do +RTS -qm to disable work pushing anyway (work pushing takes too much time when the run queues are large, this is something we need to fix).
-rw-r--r--ghc/compiler/prelude/primops.txt.pp7
-rw-r--r--ghc/includes/StgMiscClosures.h1
-rw-r--r--ghc/includes/TSO.h8
-rw-r--r--ghc/rts/PrimOps.cmm38
-rw-r--r--ghc/rts/Schedule.c73
-rw-r--r--ghc/rts/Schedule.h7
6 files changed, 110 insertions, 24 deletions
diff --git a/ghc/compiler/prelude/primops.txt.pp b/ghc/compiler/prelude/primops.txt.pp
index ecde8821eb..13b4b6c97d 100644
--- a/ghc/compiler/prelude/primops.txt.pp
+++ b/ghc/compiler/prelude/primops.txt.pp
@@ -1441,6 +1441,13 @@ primop ForkOp "fork#" GenPrimOp
has_side_effects = True
out_of_line = True
+primop ForkOnOp "forkOn#" GenPrimOp
+ Int# -> a -> State# RealWorld -> (# State# RealWorld, ThreadId# #)
+ with
+ usage = { mangle ForkOnOp [mkO, mkP] mkR }
+ has_side_effects = True
+ out_of_line = True
+
primop KillThreadOp "killThread#" GenPrimOp
ThreadId# -> a -> State# RealWorld -> State# RealWorld
with
diff --git a/ghc/includes/StgMiscClosures.h b/ghc/includes/StgMiscClosures.h
index 62a7ed33de..4a6a7c47c2 100644
--- a/ghc/includes/StgMiscClosures.h
+++ b/ghc/includes/StgMiscClosures.h
@@ -579,6 +579,7 @@ RTS_FUN(makeStablePtrzh_fast);
RTS_FUN(deRefStablePtrzh_fast);
RTS_FUN(forkzh_fast);
+RTS_FUN(forkOnzh_fast);
RTS_FUN(yieldzh_fast);
RTS_FUN(killThreadzh_fast);
RTS_FUN(blockAsyncExceptionszh_fast);
diff --git a/ghc/includes/TSO.h b/ghc/includes/TSO.h
index 14c47abed4..d096d401cf 100644
--- a/ghc/includes/TSO.h
+++ b/ghc/includes/TSO.h
@@ -93,7 +93,13 @@ typedef StgWord32 StgThreadID;
*/
#define TSO_DIRTY 1
-#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
+/*
+ * TSO_LOCKED is set when a TSO is locked to a particular Capability.
+ */
+#define TSO_LOCKED 2
+
+#define tsoDirty(tso) ((tso)->flags & TSO_DIRTY)
+#define tsoLocked(tso) ((tso)->flags & TSO_LOCKED)
/*
* Type returned after running a thread. Values of this type
diff --git a/ghc/rts/PrimOps.cmm b/ghc/rts/PrimOps.cmm
index 23bc22efd1..f1c214e304 100644
--- a/ghc/rts/PrimOps.cmm
+++ b/ghc/rts/PrimOps.cmm
@@ -876,19 +876,45 @@ decodeDoublezh_fast
forkzh_fast
{
/* args: R1 = closure to spark */
-
+
MAYBE_GC(R1_PTR, forkzh_fast);
- // create it right now, return ThreadID in R1
- "ptr" R1 = foreign "C" createIOThread( MyCapability() "ptr",
+ W_ closure;
+ W_ threadid;
+ closure = R1;
+
+ "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr",
+ RtsFlags_GcFlags_initialStkSize(RtsFlags),
+ closure "ptr") [];
+ foreign "C" scheduleThread(MyCapability() "ptr", threadid "ptr") [];
+
+ // switch at the earliest opportunity
+ CInt[context_switch] = 1 :: CInt;
+
+ RET_P(threadid);
+}
+
+forkOnzh_fast
+{
+ /* args: R1 = cpu, R2 = closure to spark */
+
+ MAYBE_GC(R2_PTR, forkOnzh_fast);
+
+ W_ cpu;
+ W_ closure;
+ W_ threadid;
+ cpu = R1;
+ closure = R2;
+
+ "ptr" threadid = foreign "C" createIOThread( MyCapability() "ptr",
RtsFlags_GcFlags_initialStkSize(RtsFlags),
- R1 "ptr") [R1];
- foreign "C" scheduleThread(MyCapability() "ptr", R1 "ptr") [R1];
+ closure "ptr") [];
+ foreign "C" scheduleThreadOn(MyCapability() "ptr", cpu, threadid "ptr") [];
// switch at the earliest opportunity
CInt[context_switch] = 1 :: CInt;
- RET_P(R1);
+ RET_P(threadid);
}
yieldzh_fast
diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c
index d49d4ed8e5..52fd4d5df6 100644
--- a/ghc/rts/Schedule.c
+++ b/ghc/rts/Schedule.c
@@ -204,6 +204,7 @@ static void schedulePushWork(Capability *cap, Task *task);
#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
+static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
#if defined(GRAN)
@@ -482,20 +483,7 @@ schedule (Capability *initialCapability, Task *task)
// list each time around the scheduler.
if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
- // Any threads that were woken up by other Capabilities get
- // appended to our run queue.
- if (!emptyWakeupQueue(cap)) {
- ACQUIRE_LOCK(&cap->lock);
- if (emptyRunQueue(cap)) {
- cap->run_queue_hd = cap->wakeup_queue_hd;
- cap->run_queue_tl = cap->wakeup_queue_tl;
- } else {
- cap->run_queue_tl->link = cap->wakeup_queue_hd;
- cap->run_queue_tl = cap->wakeup_queue_tl;
- }
- cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
- RELEASE_LOCK(&cap->lock);
- }
+ scheduleCheckWakeupThreads(cap);
scheduleCheckBlockedThreads(cap);
@@ -841,7 +829,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
next = t->link;
t->link = END_TSO_QUEUE;
if (t->what_next == ThreadRelocated
- || t->bound == task) { // don't move my bound thread
+ || t->bound == task // don't move my bound thread
+ || tsoLocked(t)) { // don't move a locked thread
prev->link = t;
prev = t;
} else if (i == n_free_caps) {
@@ -928,6 +917,31 @@ scheduleCheckBlockedThreads(Capability *cap USED_IF_NOT_THREADS)
/* ----------------------------------------------------------------------------
+ * Check for threads woken up by other Capabilities
+ * ------------------------------------------------------------------------- */
+
+static void
+scheduleCheckWakeupThreads(Capability *cap USED_IF_THREADS)
+{
+#if defined(THREADED_RTS)
+ // Any threads that were woken up by other Capabilities get
+ // appended to our run queue.
+ if (!emptyWakeupQueue(cap)) {
+ ACQUIRE_LOCK(&cap->lock);
+ if (emptyRunQueue(cap)) {
+ cap->run_queue_hd = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ } else {
+ cap->run_queue_tl->link = cap->wakeup_queue_hd;
+ cap->run_queue_tl = cap->wakeup_queue_tl;
+ }
+ cap->wakeup_queue_hd = cap->wakeup_queue_tl = END_TSO_QUEUE;
+ RELEASE_LOCK(&cap->lock);
+ }
+#endif
+}
+
+/* ----------------------------------------------------------------------------
* Check for threads blocked on BLACKHOLEs that can be woken up
* ------------------------------------------------------------------------- */
static void
@@ -2709,6 +2723,28 @@ scheduleThread(Capability *cap, StgTSO *tso)
appendToRunQueue(cap,tso);
}
+void
+scheduleThreadOn(Capability *cap, StgWord cpu USED_IF_THREADS, StgTSO *tso)
+{
+#if defined(THREADED_RTS)
+ tso->flags |= TSO_LOCKED; // we requested explicit affinity; don't
+ // move this thread from now on.
+ cpu %= RtsFlags.ParFlags.nNodes;
+ if (cpu == cap->no) {
+ appendToRunQueue(cap,tso);
+ } else {
+ Capability *target_cap = &capabilities[cpu];
+ if (tso->bound) {
+ tso->bound->cap = target_cap;
+ }
+ tso->cap = target_cap;
+ wakeupThreadOnCapability(target_cap,tso);
+ }
+#else
+ appendToRunQueue(cap,tso);
+#endif
+}
+
Capability *
scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
{
@@ -3244,7 +3280,8 @@ unblockOne(Capability *cap, StgTSO *tso)
next = tso->link;
tso->link = END_TSO_QUEUE;
- if (RtsFlags.ParFlags.wakeupMigrate || tso->cap == cap) {
+#if defined(THREADED_RTS)
+ if (tso->cap == cap || (!tsoLocked(tso) && RtsFlags.ParFlags.wakeupMigrate)) {
// We are waking up this thread on the current Capability, which
// might involve migrating it from the Capability it was last on.
if (tso->bound) {
@@ -3260,6 +3297,10 @@ unblockOne(Capability *cap, StgTSO *tso)
// we'll try to wake it up on the Capability it was last on.
wakeupThreadOnCapability(tso->cap, tso);
}
+#else
+ appendToRunQueue(cap,tso);
+ context_switch = 1;
+#endif
IF_DEBUG(scheduler,sched_belch("waking up thread %ld on cap %d", (long)tso->id, tso->cap->no));
return next;
diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h
index d11162e47d..37b07941f4 100644
--- a/ghc/rts/Schedule.h
+++ b/ghc/rts/Schedule.h
@@ -20,9 +20,14 @@
void initScheduler (void);
void exitScheduler (void);
-// Place a new thread on the run queue of the specified Capability
+// Place a new thread on the run queue of the current Capability
void scheduleThread (Capability *cap, StgTSO *tso);
+// Place a new thread on the run queue of a specified Capability
+// (cap is the currently owned Capability, cpu is the number of
+// the desired Capability).
+void scheduleThreadOn(Capability *cap, StgWord cpu, StgTSO *tso);
+
/* awakenBlockedQueue()
*
* Takes a pointer to the beginning of a blocked TSO queue, and