summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--ghc/includes/Closures.h5
-rw-r--r--ghc/includes/Regs.h32
-rw-r--r--ghc/includes/RtsFlags.h1
-rw-r--r--ghc/includes/StgMiscClosures.h2
-rw-r--r--ghc/includes/Storage.h2
-rw-r--r--ghc/rts/Capability.c5
-rw-r--r--ghc/rts/GC.c226
-rw-r--r--ghc/rts/Interpreter.c2
-rw-r--r--ghc/rts/RtsFlags.c5
-rw-r--r--ghc/rts/Schedule.c321
-rw-r--r--ghc/rts/Schedule.h8
-rw-r--r--ghc/rts/Sparks.c388
-rw-r--r--ghc/rts/Sparks.h76
-rw-r--r--ghc/rts/StgCRun.c3
-rw-r--r--ghc/rts/StgStartup.cmm4
-rw-r--r--ghc/rts/Updates.cmm14
-rw-r--r--ghc/rts/Updates.h15
17 files changed, 618 insertions, 491 deletions
diff --git a/ghc/includes/Closures.h b/ghc/includes/Closures.h
index f9bfeb4dc1..df3bca32f7 100644
--- a/ghc/includes/Closures.h
+++ b/ghc/includes/Closures.h
@@ -339,6 +339,11 @@ typedef struct {
* - In StgTRecHeader, it might be worthwhile having separate chunks
* of read-only and read-write locations. This would save a
* new_value field in the read-only locations.
+ *
+ * - In StgAtomicallyFrame, we could combine the waiting bit into
+ * the header (maybe a different info tbl for a waiting transaction).
+ * This means we can specialise the code for the atomically frame
+ * (it immediately switches on frame->waiting anyway).
*/
typedef struct StgTVarWaitQueue_ {
diff --git a/ghc/includes/Regs.h b/ghc/includes/Regs.h
index f1b8597d60..def36c327d 100644
--- a/ghc/includes/Regs.h
+++ b/ghc/includes/Regs.h
@@ -24,15 +24,17 @@
#include "gmp.h" // Needs MP_INT definition
-/*
- * This is the table that holds shadow-locations for all the STG
- * registers. The shadow locations are used when:
- *
- * 1) the particular register isn't mapped to a real machine
- * register, probably because there's a shortage of real registers.
- * 2) caller-saves registers are saved across a CCall
+/*
+ * Spark pools: used to store pending sparks (SMP & PARALLEL_HASKELL only)
+ * This is a circular buffer. Invariants:
+ * - base <= hd < lim
+ * - base <= tl < lim
+ * - if hd==tl, then the pool is empty.
+ * - if hd == tl+1, then the pool is full.
+ * Adding to the pool is done by assigning to *tl++ (wrapping round as
+ * necessary). When adding to a full pool, we have the option of
+ * throwing away either the oldest (hd++) or the most recent (tl--) entry.
*/
-
typedef struct StgSparkPool_ {
StgClosure **base;
StgClosure **lim;
@@ -40,6 +42,12 @@ typedef struct StgSparkPool_ {
StgClosure **tl;
} StgSparkPool;
+#define ASSERT_SPARK_POOL_INVARIANTS(p) \
+ ASSERT((p)->base <= (p)->hd); \
+ ASSERT((p)->hd < (p)->lim); \
+ ASSERT((p)->base <= (p)->tl); \
+ ASSERT((p)->tl < (p)->lim);
+
typedef struct {
StgFunPtr stgGCEnter1;
StgFunPtr stgGCFun;
@@ -64,6 +72,14 @@ typedef union {
StgTSOPtr t;
} StgUnion;
+/*
+ * This is the table that holds shadow-locations for all the STG
+ * registers. The shadow locations are used when:
+ *
+ * 1) the particular register isn't mapped to a real machine
+ * register, probably because there's a shortage of real registers.
+ * 2) caller-saves registers are saved across a CCall
+ */
typedef struct StgRegTable_ {
StgUnion rR1;
StgUnion rR2;
diff --git a/ghc/includes/RtsFlags.h b/ghc/includes/RtsFlags.h
index a4c6d9ba58..4a37d48fd4 100644
--- a/ghc/includes/RtsFlags.h
+++ b/ghc/includes/RtsFlags.h
@@ -62,6 +62,7 @@ struct DEBUG_FLAGS {
rtsBool linker; /* 'l' the object linker */
rtsBool apply; /* 'a' */
rtsBool stm; /* 'm' */
+ rtsBool squeeze; /* 'z' stack squeezing & lazy blackholing */
};
struct COST_CENTRE_FLAGS {
diff --git a/ghc/includes/StgMiscClosures.h b/ghc/includes/StgMiscClosures.h
index 1d381e656e..a9938aa9f4 100644
--- a/ghc/includes/StgMiscClosures.h
+++ b/ghc/includes/StgMiscClosures.h
@@ -37,6 +37,7 @@
/* Stack frames */
RTS_RET_INFO(stg_upd_frame_info);
+RTS_RET_INFO(stg_marked_upd_frame_info);
RTS_RET_INFO(stg_noupd_frame_info);
RTS_RET_INFO(stg_seq_frame_info);
RTS_RET_INFO(stg_catch_frame_info);
@@ -45,6 +46,7 @@ RTS_RET_INFO(stg_atomically_frame_info);
RTS_RET_INFO(stg_catch_stm_frame_info);
RTS_ENTRY(stg_upd_frame_ret);
+RTS_ENTRY(stg_marked_upd_frame_ret);
RTS_ENTRY(stg_seq_frame_ret);
/* Entry code for constructors created by the bytecode interpreter */
diff --git a/ghc/includes/Storage.h b/ghc/includes/Storage.h
index 1f6ef3f5e7..e37c50d054 100644
--- a/ghc/includes/Storage.h
+++ b/ghc/includes/Storage.h
@@ -392,7 +392,7 @@ extern lnat countNurseryBlocks ( void );
Functions from GC.c
-------------------------------------------------------------------------- */
-extern void threadPaused ( StgTSO * );
+extern void threadPaused ( Capability *cap, StgTSO * );
extern StgClosure * isAlive ( StgClosure *p );
extern void markCAFs ( evac_fn evac );
diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c
index d08bf23339..5872f427aa 100644
--- a/ghc/rts/Capability.c
+++ b/ghc/rts/Capability.c
@@ -23,6 +23,7 @@
#include "OSThreads.h"
#include "Capability.h"
#include "Schedule.h"
+#include "Sparks.h"
#if !defined(SMP)
Capability MainCapability; // for non-SMP, we have one global capability
@@ -74,6 +75,8 @@ anyWorkForMe( Capability *cap, Task *task )
} else {
return (cap->run_queue_hd->bound == task);
}
+ } else if (task->tso == NULL && !emptySparkPoolCap(cap)) {
+ return rtsTrue;
}
return globalWorkToDo();
}
@@ -263,7 +266,7 @@ releaseCapability_ (Capability* cap)
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
- if (!emptyRunQueue(cap) || globalWorkToDo()) {
+ if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
diff --git a/ghc/rts/GC.c b/ghc/rts/GC.c
index 0399d6034e..bc8546a115 100644
--- a/ghc/rts/GC.c
+++ b/ghc/rts/GC.c
@@ -4250,74 +4250,6 @@ gcCAFs(void)
/* -----------------------------------------------------------------------------
- Lazy black holing.
-
- Whenever a thread returns to the scheduler after possibly doing
- some work, we have to run down the stack and black-hole all the
- closures referred to by update frames.
- -------------------------------------------------------------------------- */
-
-static void
-threadLazyBlackHole(StgTSO *tso)
-{
- StgClosure *frame;
- StgRetInfoTable *info;
- StgClosure *bh;
- StgPtr stack_end;
-
- stack_end = &tso->stack[tso->stack_size];
-
- frame = (StgClosure *)tso->sp;
-
- while (1) {
- info = get_ret_itbl(frame);
-
- switch (info->i.type) {
-
- case UPDATE_FRAME:
- bh = ((StgUpdateFrame *)frame)->updatee;
-
- /* if the thunk is already blackholed, it means we've also
- * already blackholed the rest of the thunks on this stack,
- * so we can stop early.
- *
- * The blackhole made for a CAF is a CAF_BLACKHOLE, so they
- * don't interfere with this optimisation.
- */
- if (bh->header.info == &stg_BLACKHOLE_info) {
- return;
- }
-
- if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
-#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
- debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh);
-#endif
-#ifdef PROFILING
- // @LDV profiling
- // We pretend that bh is now dead.
- LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
-#endif
- SET_INFO(bh,&stg_BLACKHOLE_info);
-
- // We pretend that bh has just been created.
- LDV_RECORD_CREATE(bh);
- }
-
- frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
- break;
-
- case STOP_FRAME:
- return;
-
- // normal stack frames; do nothing except advance the pointer
- default:
- frame = (StgClosure *)((StgPtr)frame + stack_frame_sizeW(frame));
- }
- }
-}
-
-
-/* -----------------------------------------------------------------------------
* Stack squeezing
*
* Code largely pinched from old RTS, then hacked to bits. We also do
@@ -4328,12 +4260,11 @@ threadLazyBlackHole(StgTSO *tso)
struct stack_gap { StgWord gap_size; struct stack_gap *next_gap; };
static void
-threadSqueezeStack(StgTSO *tso)
+stackSqueeze(StgTSO *tso, StgPtr bottom)
{
StgPtr frame;
rtsBool prev_was_update_frame;
StgClosure *updatee = NULL;
- StgPtr bottom;
StgRetInfoTable *info;
StgWord current_gap_size;
struct stack_gap *gap;
@@ -4344,8 +4275,6 @@ threadSqueezeStack(StgTSO *tso)
// contains two values: the size of the gap, and the distance
// to the next gap (or the stack top).
- bottom = &(tso->stack[tso->stack_size]);
-
frame = tso->sp;
ASSERT(frame < bottom);
@@ -4363,20 +4292,6 @@ threadSqueezeStack(StgTSO *tso)
{
StgUpdateFrame *upd = (StgUpdateFrame *)frame;
- if (upd->updatee->header.info == &stg_BLACKHOLE_info) {
-
- // found a BLACKHOLE'd update frame; we've been here
- // before, in a previous GC, so just break out.
-
- // Mark the end of the gap, if we're in one.
- if (current_gap_size != 0) {
- gap = (struct stack_gap *)(frame-sizeofW(StgUpdateFrame));
- }
-
- frame += sizeofW(StgUpdateFrame);
- goto done_traversing;
- }
-
if (prev_was_update_frame) {
TICK_UPD_SQUEEZED();
@@ -4409,31 +4324,6 @@ threadSqueezeStack(StgTSO *tso)
// single update frame, or the topmost update frame in a series
else {
- StgClosure *bh = upd->updatee;
-
- // Do lazy black-holing
- if (bh->header.info != &stg_BLACKHOLE_info &&
- bh->header.info != &stg_CAF_BLACKHOLE_info) {
-#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
- debugBelch("Unexpected lazy BHing required at 0x%04lx",(long)bh);
-#endif
-#ifdef DEBUG
- // zero out the slop so that the sanity checker can tell
- // where the next closure is.
- DEBUG_FILL_SLOP(bh);
-#endif
-#ifdef PROFILING
- // We pretend that bh is now dead.
- // ToDo: is the slop filling the same as DEBUG_FILL_SLOP?
- LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
-#endif
- // Todo: maybe use SET_HDR() and remove LDV_RECORD_CREATE()?
- SET_INFO(bh,&stg_BLACKHOLE_info);
-
- // We pretend that bh has just been created.
- LDV_RECORD_CREATE(bh);
- }
-
prev_was_update_frame = rtsTrue;
updatee = upd->updatee;
frame += sizeofW(StgUpdateFrame);
@@ -4456,8 +4346,10 @@ threadSqueezeStack(StgTSO *tso)
}
}
-done_traversing:
-
+ if (current_gap_size != 0) {
+ gap = (struct stack_gap *) (frame - sizeofW(StgUpdateFrame));
+ }
+
// Now we have a stack with gaps in it, and we have to walk down
// shoving the stack up to fill in the gaps. A diagram might
// help:
@@ -4515,12 +4407,110 @@ done_traversing:
* turned on.
* -------------------------------------------------------------------------- */
void
-threadPaused(StgTSO *tso)
+threadPaused(Capability *cap, StgTSO *tso)
{
- if ( RtsFlags.GcFlags.squeezeUpdFrames == rtsTrue )
- threadSqueezeStack(tso); // does black holing too
- else
- threadLazyBlackHole(tso);
+ StgClosure *frame;
+ StgRetInfoTable *info;
+ StgClosure *bh;
+ StgPtr stack_end;
+ nat words_to_squeeze = 0;
+ nat weight = 0;
+ nat weight_pending = 0;
+ rtsBool prev_was_update_frame;
+
+ stack_end = &tso->stack[tso->stack_size];
+
+ frame = (StgClosure *)tso->sp;
+
+ while (1) {
+ // If we've already marked this frame, then stop here.
+ if (frame->header.info == (StgInfoTable *)&stg_marked_upd_frame_info) {
+ goto end;
+ }
+
+ info = get_ret_itbl(frame);
+
+ switch (info->i.type) {
+
+ case UPDATE_FRAME:
+
+ SET_INFO(frame, (StgInfoTable *)&stg_marked_upd_frame_info);
+
+ bh = ((StgUpdateFrame *)frame)->updatee;
+
+ if (closure_IND(bh) || bh->header.info == &stg_BLACKHOLE_info) {
+ IF_DEBUG(squeeze, debugBelch("suspending duplicate work: %d words of stack\n", (StgPtr)frame - tso->sp));
+
+ // If this closure is already an indirection, then
+ // suspend the computation up to this point:
+ suspendComputation(cap,tso,(StgPtr)frame);
+
+ // Now drop the update frame, and arrange to return
+ // the value to the frame underneath:
+ tso->sp = (StgPtr)frame + sizeofW(StgUpdateFrame) - 2;
+ tso->sp[1] = (StgWord)bh;
+ tso->sp[0] = (W_)&stg_enter_info;
+
+ // And continue with threadPaused; there might be
+ // yet more computation to suspend.
+ threadPaused(cap,tso);
+ return;
+ }
+
+ if (bh->header.info != &stg_CAF_BLACKHOLE_info) {
+#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG)
+ debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh);
+#endif
+ // zero out the slop so that the sanity checker can tell
+ // where the next closure is.
+ DEBUG_FILL_SLOP(bh);
+#ifdef PROFILING
+ // @LDV profiling
+ // We pretend that bh is now dead.
+ LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh);
+#endif
+ SET_INFO(bh,&stg_BLACKHOLE_info);
+
+ // We pretend that bh has just been created.
+ LDV_RECORD_CREATE(bh);
+ }
+
+ frame = (StgClosure *) ((StgUpdateFrame *)frame + 1);
+ if (prev_was_update_frame) {
+ words_to_squeeze += sizeofW(StgUpdateFrame);
+ weight += weight_pending;
+ weight_pending = 0;
+ }
+ prev_was_update_frame = rtsTrue;
+ break;
+
+ case STOP_FRAME:
+ goto end;
+
+ // normal stack frames; do nothing except advance the pointer
+ default:
+ {
+ nat frame_size = stack_frame_sizeW(frame);
+ weight_pending += frame_size;
+ frame = (StgClosure *)((StgPtr)frame + frame_size);
+ prev_was_update_frame = rtsFalse;
+ }
+ }
+ }
+
+end:
+ IF_DEBUG(squeeze,
+ debugBelch("words_to_squeeze: %d, weight: %d, squeeze: %s\n",
+ words_to_squeeze, weight,
+ weight < words_to_squeeze ? "YES" : "NO"));
+
+ // Should we squeeze or not? Arbitrary heuristic: we squeeze if
+ // the number of words we have to shift down is less than the
+ // number of stack words we squeeze away by doing so.
+ if (1 /*RtsFlags.GcFlags.squeezeUpdFrames == rtsTrue &&
+ weight < words_to_squeeze*/) {
+ stackSqueeze(tso, (StgPtr)frame);
+ }
}
/* -----------------------------------------------------------------------------
diff --git a/ghc/rts/Interpreter.c b/ghc/rts/Interpreter.c
index f007c4abd9..b31ade08fb 100644
--- a/ghc/rts/Interpreter.c
+++ b/ghc/rts/Interpreter.c
@@ -57,7 +57,7 @@
#define RETURN_TO_SCHEDULER(todo,retcode) \
SAVE_STACK_POINTERS; \
cap->r.rCurrentTSO->what_next = (todo); \
- threadPaused(cap->r.rCurrentTSO); \
+ threadPaused(cap,cap->r.rCurrentTSO); \
cap->r.rRet = (retcode); \
return cap;
diff --git a/ghc/rts/RtsFlags.c b/ghc/rts/RtsFlags.c
index f086368a16..25d53da1fe 100644
--- a/ghc/rts/RtsFlags.c
+++ b/ghc/rts/RtsFlags.c
@@ -190,6 +190,7 @@ void initRtsFlagsDefaults(void)
RtsFlags.DebugFlags.gran = rtsFalse;
RtsFlags.DebugFlags.par = rtsFalse;
RtsFlags.DebugFlags.linker = rtsFalse;
+ RtsFlags.DebugFlags.squeeze = rtsFalse;
#endif
#if defined(PROFILING) || defined(PAR)
@@ -431,6 +432,7 @@ usage_text[] = {
" -DP DEBUG: par",
" -Dl DEBUG: linker",
" -Dm DEBUG: stm",
+" -Dz DEBUG: stack squezing",
"",
#endif /* DEBUG */
#if defined(SMP)
@@ -726,6 +728,9 @@ error = rtsTrue;
case 'm':
RtsFlags.DebugFlags.stm = rtsTrue;
break;
+ case 'z':
+ RtsFlags.DebugFlags.squeeze = rtsTrue;
+ break;
default:
bad_option( rts_argv[arg] );
}
diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c
index 299e132548..4891bbf9dd 100644
--- a/ghc/rts/Schedule.c
+++ b/ghc/rts/Schedule.c
@@ -250,7 +250,7 @@ static void AllRoots(evac_fn evac);
static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso);
static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically);
+ rtsBool stop_at_atomically, StgPtr stop_here);
static void deleteThread (Capability *cap, StgTSO *tso);
static void deleteRunQueue (Capability *cap);
@@ -396,7 +396,7 @@ schedule (Capability *initialCapability, Task *task)
#ifdef SMP
schedulePushWork(cap,task);
-#endif
+#endif
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
@@ -415,6 +415,9 @@ schedule (Capability *initialCapability, Task *task)
//
if (interrupted) {
deleteRunQueue(cap);
+#if defined(SMP)
+ discardSparksCap(cap);
+#endif
if (shutting_down_scheduler) {
IF_DEBUG(scheduler, sched_belch("shutting down"));
// If we are a worker, just exit. If we're a bound thread
@@ -428,23 +431,17 @@ schedule (Capability *initialCapability, Task *task)
}
}
-#if defined(not_yet) && defined(SMP)
- //
- // Top up the run queue from our spark pool. We try to make the
- // number of threads in the run queue equal to the number of
- // free capabilities.
- //
+#if defined(SMP)
+ // If the run queue is empty, take a spark and turn it into a thread.
{
- StgClosure *spark;
- if (emptyRunQueue()) {
- spark = findSpark(rtsFalse);
- if (spark == NULL) {
- break; /* no more sparks in the pool */
- } else {
- createSparkThread(spark);
+ if (emptyRunQueue(cap)) {
+ StgClosure *spark;
+ spark = findSpark(cap);
+ if (spark != NULL) {
IF_DEBUG(scheduler,
- sched_belch("==^^ turning spark of closure %p into a thread",
+ sched_belch("turning spark of closure %p into a thread",
(StgClosure *)spark));
+ createSparkThread(cap,spark);
}
}
}
@@ -739,9 +736,10 @@ schedulePushWork(Capability *cap USED_WHEN_SMP,
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
- // Check whether we have more threads on our run queue that we
- // could hand to another Capability.
- if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) {
+ // Check whether we have more threads on our run queue, or sparks
+ // in our pool, that we could hand to another Capability.
+ if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE)
+ && sparkPoolSizeCap(cap) < 2) {
return;
}
@@ -772,31 +770,54 @@ schedulePushWork(Capability *cap USED_WHEN_SMP,
if (n_free_caps > 0) {
StgTSO *prev, *t, *next;
+ rtsBool pushed_to_all;
+
IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps));
- prev = cap->run_queue_hd;
- t = prev->link;
- prev->link = END_TSO_QUEUE;
i = 0;
- for (; t != END_TSO_QUEUE; t = next) {
- next = t->link;
- t->link = END_TSO_QUEUE;
- if (t->what_next == ThreadRelocated
- || t->bound == task) { // don't move my bound thread
- prev->link = t;
- prev = t;
- } else if (i == n_free_caps) {
- i = 0;
- // keep one for us
- prev->link = t;
- prev = t;
- } else {
- appendToRunQueue(free_caps[i],t);
- if (t->bound) { t->bound->cap = free_caps[i]; }
- i++;
+ pushed_to_all = rtsFalse;
+
+ if (cap->run_queue_hd != END_TSO_QUEUE) {
+ prev = cap->run_queue_hd;
+ t = prev->link;
+ prev->link = END_TSO_QUEUE;
+ for (; t != END_TSO_QUEUE; t = next) {
+ next = t->link;
+ t->link = END_TSO_QUEUE;
+ if (t->what_next == ThreadRelocated
+ || t->bound == task) { // don't move my bound thread
+ prev->link = t;
+ prev = t;
+ } else if (i == n_free_caps) {
+ pushed_to_all = rtsTrue;
+ i = 0;
+ // keep one for us
+ prev->link = t;
+ prev = t;
+ } else {
+ appendToRunQueue(free_caps[i],t);
+ if (t->bound) { t->bound->cap = free_caps[i]; }
+ i++;
+ }
+ }
+ cap->run_queue_tl = prev;
+ }
+
+ // If there are some free capabilities that we didn't push any
+ // threads to, then try to push a spark to each one.
+ if (!pushed_to_all) {
+ StgClosure *spark;
+ // i is the next free capability to push to
+ for (; i < n_free_caps; i++) {
+ if (emptySparkPoolCap(free_caps[i])) {
+ spark = findSpark(cap);
+ if (spark != NULL) {
+ IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no));
+ newSpark(&(free_caps[i]->r), spark);
+ }
+ }
}
}
- cap->run_queue_tl = prev;
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
@@ -812,15 +833,20 @@ schedulePushWork(Capability *cap USED_WHEN_SMP,
* Start any pending signal handlers
* ------------------------------------------------------------------------- */
+#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
static void
scheduleStartSignalHandlers(Capability *cap)
{
-#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS))
if (signals_pending()) { // safe outside the lock
startSignalHandlers(cap);
}
-#endif
}
+#else
+static void
+scheduleStartSignalHandlers(Capability *cap STG_UNUSED)
+{
+}
+#endif
/* ----------------------------------------------------------------------------
* Check for blocked threads that can be woken up.
@@ -1926,7 +1952,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major )
// ATOMICALLY_FRAME, aborting the (nested)
// transaction, and saving the stack of any
// partially-evaluated thunks on the heap.
- raiseAsync_(cap, t, NULL, rtsTrue);
+ raiseAsync_(cap, t, NULL, rtsTrue, NULL);
#ifdef REG_R1
ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME);
@@ -2165,7 +2191,7 @@ suspendThread (StgRegTable *reg)
// XXX this might not be necessary --SDM
tso->what_next = ThreadRunGHC;
- threadPaused(tso);
+ threadPaused(cap,tso);
if(tso->blocked_exceptions == NULL) {
tso->why_blocked = BlockedOnCCall;
@@ -2660,6 +2686,10 @@ initScheduler(void)
initTaskManager();
+#if defined(SMP) || defined(PARALLEL_HASKELL)
+ initSparkPools();
+#endif
+
#if defined(SMP)
/*
* Eagerly start one worker to run each Capability, except for
@@ -2679,10 +2709,6 @@ initScheduler(void)
}
#endif
-#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL)
- initSparkPools();
-#endif
-
RELEASE_LOCK(&sched_mutex);
}
@@ -2772,7 +2798,7 @@ GetRoots( evac_fn evac )
evac((StgClosure **)&blackhole_queue);
-#if defined(PARALLEL_HASKELL) || defined(GRAN)
+#if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN)
markSparkQueue(evac);
#endif
@@ -3607,15 +3633,22 @@ checkBlackHoles (Capability *cap)
void
raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception)
{
- raiseAsync_(cap, tso, exception, rtsFalse);
+ raiseAsync_(cap, tso, exception, rtsFalse, NULL);
+}
+
+void
+suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here)
+{
+ raiseAsync_(cap, tso, NULL, rtsFalse, stop_here);
}
static void
raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
- rtsBool stop_at_atomically)
+ rtsBool stop_at_atomically, StgPtr stop_here)
{
StgRetInfoTable *info;
- StgPtr sp;
+ StgPtr sp, frame;
+ nat i;
// Thread already dead?
if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) {
@@ -3640,8 +3673,8 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
sp[0] = (W_)&stg_dummy_ret_closure;
}
- while (1) {
- nat i;
+ frame = sp + 1;
+ while (stop_here == NULL || frame < stop_here) {
// 1. Let the top of the stack be the "current closure"
//
@@ -3661,95 +3694,10 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
// NB: if we pass an ATOMICALLY_FRAME then abort the associated
// transaction
-
- StgPtr frame;
-
- frame = sp + 1;
info = get_ret_itbl((StgClosure *)frame);
-
- while (info->i.type != UPDATE_FRAME
- && (info->i.type != CATCH_FRAME || exception == NULL)
- && info->i.type != STOP_FRAME
- && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse))
- {
- if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) {
- // IF we find an ATOMICALLY_FRAME then we abort the
- // current transaction and propagate the exception. In
- // this case (unlike ordinary exceptions) we do not care
- // whether the transaction is valid or not because its
- // possible validity cannot have caused the exception
- // and will not be visible after the abort.
- IF_DEBUG(stm,
- debugBelch("Found atomically block delivering async exception\n"));
- stmAbortTransaction(tso -> trec);
- tso -> trec = stmGetEnclosingTRec(tso -> trec);
- }
- frame += stack_frame_sizeW((StgClosure *)frame);
- info = get_ret_itbl((StgClosure *)frame);
- }
-
+
switch (info->i.type) {
-
- case ATOMICALLY_FRAME:
- ASSERT(stop_at_atomically);
- ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
- stmCondemnTransaction(tso -> trec);
-#ifdef REG_R1
- tso->sp = frame;
-#else
- // R1 is not a register: the return convention for IO in
- // this case puts the return value on the stack, so we
- // need to set up the stack to return to the atomically
- // frame properly...
- tso->sp = frame - 2;
- tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
- tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
-#endif
- tso->what_next = ThreadRunGHC;
- return;
- case CATCH_FRAME:
- // If we find a CATCH_FRAME, and we've got an exception to raise,
- // then build the THUNK raise(exception), and leave it on
- // top of the CATCH_FRAME ready to enter.
- //
- {
-#ifdef PROFILING
- StgCatchFrame *cf = (StgCatchFrame *)frame;
-#endif
- StgThunk *raise;
-
- // we've got an exception to raise, so let's pass it to the
- // handler in this frame.
- //
- raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
- TICK_ALLOC_SE_THK(1,0);
- SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
- raise->payload[0] = exception;
-
- // throw away the stack from Sp up to the CATCH_FRAME.
- //
- sp = frame - 1;
-
- /* Ensure that async excpetions are blocked now, so we don't get
- * a surprise exception before we get around to executing the
- * handler.
- */
- if (tso->blocked_exceptions == NULL) {
- tso->blocked_exceptions = END_TSO_QUEUE;
- }
-
- /* Put the newly-built THUNK on top of the stack, ready to execute
- * when the thread restarts.
- */
- sp[0] = (W_)raise;
- sp[-1] = (W_)&stg_enter_info;
- tso->sp = sp-1;
- tso->what_next = ThreadRunGHC;
- IF_DEBUG(sanity, checkTSO(tso));
- return;
- }
-
case UPDATE_FRAME:
{
StgAP_STACK * ap;
@@ -3780,9 +3728,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
printObj((StgClosure *)ap);
);
- // Replace the updatee with an indirection - happily
- // this will also wake up any threads currently
- // waiting on the result.
+ // Replace the updatee with an indirection
//
// Warning: if we're in a loop, more than one update frame on
// the stack may point to the same object. Be careful not to
@@ -3799,20 +3745,104 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception,
}
sp += sizeofW(StgUpdateFrame) - 1;
sp[0] = (W_)ap; // push onto stack
+ frame = sp + 1;
break;
}
-
+
case STOP_FRAME:
// We've stripped the entire stack, the thread is now dead.
tso->what_next = ThreadKilled;
tso->sp = frame + sizeofW(StgStopFrame);
return;
+
+ case CATCH_FRAME:
+ // If we find a CATCH_FRAME, and we've got an exception to raise,
+ // then build the THUNK raise(exception), and leave it on
+ // top of the CATCH_FRAME ready to enter.
+ //
+ {
+#ifdef PROFILING
+ StgCatchFrame *cf = (StgCatchFrame *)frame;
+#endif
+ StgThunk *raise;
+
+ if (exception == NULL) break;
+
+ // we've got an exception to raise, so let's pass it to the
+ // handler in this frame.
+ //
+ raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE);
+ TICK_ALLOC_SE_THK(1,0);
+ SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs);
+ raise->payload[0] = exception;
+
+ // throw away the stack from Sp up to the CATCH_FRAME.
+ //
+ sp = frame - 1;
+
+ /* Ensure that async excpetions are blocked now, so we don't get
+ * a surprise exception before we get around to executing the
+ * handler.
+ */
+ if (tso->blocked_exceptions == NULL) {
+ tso->blocked_exceptions = END_TSO_QUEUE;
+ }
+
+ /* Put the newly-built THUNK on top of the stack, ready to execute
+ * when the thread restarts.
+ */
+ sp[0] = (W_)raise;
+ sp[-1] = (W_)&stg_enter_info;
+ tso->sp = sp-1;
+ tso->what_next = ThreadRunGHC;
+ IF_DEBUG(sanity, checkTSO(tso));
+ return;
+ }
+
+ case ATOMICALLY_FRAME:
+ if (stop_at_atomically) {
+ ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC);
+ stmCondemnTransaction(tso -> trec);
+#ifdef REG_R1
+ tso->sp = frame;
+#else
+ // R1 is not a register: the return convention for IO in
+ // this case puts the return value on the stack, so we
+ // need to set up the stack to return to the atomically
+ // frame properly...
+ tso->sp = frame - 2;
+ tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not?
+ tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info;
+#endif
+ tso->what_next = ThreadRunGHC;
+ return;
+ }
+ // Not stop_at_atomically... fall through and abort the
+ // transaction.
+
+ case CATCH_RETRY_FRAME:
+ // IF we find an ATOMICALLY_FRAME then we abort the
+ // current transaction and propagate the exception. In
+ // this case (unlike ordinary exceptions) we do not care
+ // whether the transaction is valid or not because its
+ // possible validity cannot have caused the exception
+ // and will not be visible after the abort.
+ IF_DEBUG(stm,
+ debugBelch("Found atomically block delivering async exception\n"));
+ stmAbortTransaction(tso -> trec);
+ tso -> trec = stmGetEnclosingTRec(tso -> trec);
+ break;
default:
- barf("raiseAsync");
+ break;
}
+
+ // move on to the next stack frame
+ frame += stack_frame_sizeW((StgClosure *)frame);
}
- barf("raiseAsync");
+
+ // if we got here, then we stopped at stop_here
+ ASSERT(stop_here != NULL);
}
/* -----------------------------------------------------------------------------
@@ -4156,6 +4186,7 @@ printAllThreads(void)
}
}
+ debugBelch("other threads:\n");
for (t = all_threads; t != END_TSO_QUEUE; t = next) {
if (t->why_blocked != NotBlocked) {
printThreadStatus(t);
diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h
index 9335e594e6..1626852997 100644
--- a/ghc/rts/Schedule.h
+++ b/ghc/rts/Schedule.h
@@ -55,6 +55,14 @@ StgTSO * unblockOne(Capability *cap, StgTSO *tso);
*/
void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception);
+/* suspendComputation()
+ *
+ * A variant of raiseAsync(), this strips the stack of the specified
+ * thread down to the stop_here point, leaving a current closure on
+ * top of the stack at [stop_here - 1].
+ */
+void suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here);
+
/* raiseExceptionHelper */
StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception);
diff --git a/ghc/rts/Sparks.c b/ghc/rts/Sparks.c
index 6e638d4106..12af296380 100644
--- a/ghc/rts/Sparks.c
+++ b/ghc/rts/Sparks.c
@@ -1,24 +1,11 @@
/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 2000
+ * (c) The GHC Team, 2000-2005
*
- * Sparking support for PAR and SMP versions of the RTS.
+ * Sparking support for PARALLEL_HASKELL and SMP versions of the RTS.
*
* -------------------------------------------------------------------------*/
-//@node Spark Management Routines, , ,
-//@section Spark Management Routines
-
-//@menu
-//* Includes::
-//* GUM code::
-//* GranSim code::
-//@end menu
-//*/
-
-//@node Includes, GUM code, Spark Management Routines, Spark Management Routines
-//@subsection Includes
-
#include "PosixSource.h"
#include "Rts.h"
#include "Schedule.h"
@@ -27,7 +14,7 @@
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "ParTicky.h"
-# if defined(PAR)
+# if defined(PARALLEL_HASKELL)
# include "ParallelRts.h"
# include "GranSimRts.h" // for GR_...
# elif defined(GRAN)
@@ -35,92 +22,216 @@
# endif
#include "Sparks.h"
-#if /*defined(SMP) ||*/ defined(PAR)
+#if defined(SMP) || defined(PARALLEL_HASKELL)
-//@node GUM code, GranSim code, Includes, Spark Management Routines
-//@subsection GUM code
+static INLINE_ME void bump_hd (StgSparkPool *p)
+{ p->hd++; if (p->hd == p->lim) p->hd = p->base; }
-static void slide_spark_pool( StgSparkPool *pool );
+static INLINE_ME void bump_tl (StgSparkPool *p)
+{ p->tl++; if (p->tl == p->lim) p->tl = p->base; }
-void
-initSparkPools( void )
-{
- Capability *cap;
- StgSparkPool *pool;
+/* -----------------------------------------------------------------------------
+ *
+ * Initialising spark pools.
+ *
+ * -------------------------------------------------------------------------- */
-#ifdef SMP
- /* walk over the capabilities, allocating a spark pool for each one */
- for (cap = free_capabilities; cap != NULL; cap = cap->link) {
-#else
- /* allocate a single spark pool */
- cap = &MainRegTable;
- {
-#endif
- pool = &(cap->rSparks);
-
+static void
+initSparkPool(StgSparkPool *pool)
+{
pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
- * sizeof(StgClosure *),
- "initSparkPools");
+ * sizeof(StgClosure *),
+ "initSparkPools");
pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
pool->hd = pool->base;
pool->tl = pool->base;
- }
}
-/*
- We traverse the spark pool until we find the 2nd usable (i.e. non-NF)
- spark. Rationale, we don't want to give away the only work a PE has.
- ToDo: introduce low- and high-water-marks for load balancing.
-*/
-StgClosure *
-findSpark( rtsBool for_export )
+void
+initSparkPools( void )
{
- Capability *cap;
- StgSparkPool *pool;
- StgClosure *spark, *first=NULL;
- rtsBool isIdlePE = EMPTY_RUN_QUEUE();
-
#ifdef SMP
- /* walk over the capabilities, allocating a spark pool for each one */
- for (cap = free_capabilities; cap != NULL; cap = cap->link) {
+ /* walk over the capabilities, allocating a spark pool for each one */
+ nat i;
+ for (i = 0; i < n_capabilities; i++) {
+ initSparkPool(&capabilities[i].r.rSparks);
+ }
#else
- /* allocate a single spark pool */
- cap = &MainRegTable;
- {
+ /* allocate a single spark pool */
+ initSparkPool(&MainCapability.r.rSparks);
#endif
- pool = &(cap->rSparks);
- while (pool->hd < pool->tl) {
- spark = *pool->hd++;
- if (closure_SHOULD_SPARK(spark)) {
- if (for_export && isIdlePE) {
- if (first==NULL) {
- first = spark; // keep the first usable spark if PE is idle
- } else {
- pool->hd--; // found a second spark; keep it in the pool
- ASSERT(*pool->hd==spark);
+}
+
+/* -----------------------------------------------------------------------------
+ *
+ * findSpark: find a spark on the current Capability that we can fork
+ * into a thread.
+ *
+ * -------------------------------------------------------------------------- */
+
+StgClosure *
+findSpark (Capability *cap)
+{
+ StgSparkPool *pool;
+ StgClosure *spark;
+
+ pool = &(cap->r.rSparks);
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+ while (pool->hd != pool->tl) {
+ spark = *pool->hd;
+ bump_hd(pool);
+ if (closure_SHOULD_SPARK(spark)) {
+#ifdef GRAN
if (RtsFlags.ParFlags.ParStats.Sparks)
- DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
- GR_STEALING, ((StgTSO *)NULL), first,
- 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
- return first; // and return the *first* spark found
- }
- } else {
- if (RtsFlags.ParFlags.ParStats.Sparks && for_export)
- DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
- GR_STEALING, ((StgTSO *)NULL), spark,
- 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
- return spark; // return first spark found
+ DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
+ GR_STEALING, ((StgTSO *)NULL), spark,
+ 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
+#endif
+ return spark;
}
- }
}
- slide_spark_pool(pool);
- }
- return NULL;
+ // spark pool is now empty
+ return NULL;
}
-/*
- activateSpark is defined in Schedule.c
-*/
+/* -----------------------------------------------------------------------------
+ * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
+ * implicit slide i.e. after marking all sparks are at the beginning of the
+ * spark pool and the spark pool only contains sparkable closures
+ * -------------------------------------------------------------------------- */
+
+void
+markSparkQueue (evac_fn evac)
+{
+ StgClosure **sparkp, **to_sparkp;
+ nat i, n, pruned_sparks; // stats only
+ StgSparkPool *pool;
+ Capability *cap;
+
+ PAR_TICKY_MARK_SPARK_QUEUE_START();
+
+ n = 0;
+ pruned_sparks = 0;
+ for (i = 0; i < n_capabilities; i++) {
+ cap = &capabilities[i];
+ pool = &(cap->r.rSparks);
+
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+#if defined(PARALLEL_HASKELL)
+ // stats only
+ n = 0;
+ pruned_sparks = 0;
+#endif
+
+ sparkp = pool->hd;
+ to_sparkp = pool->hd;
+ while (sparkp != pool->tl) {
+ ASSERT(to_sparkp<=sparkp);
+ ASSERT(*sparkp!=NULL);
+ ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
+ // ToDo?: statistics gathering here (also for GUM!)
+ if (closure_SHOULD_SPARK(*sparkp)) {
+ evac(sparkp);
+ *to_sparkp++ = *sparkp;
+ n++;
+ } else {
+ pruned_sparks++;
+ }
+ sparkp++;
+ if (sparkp == pool->lim) {
+ sparkp = pool->base;
+ }
+ }
+ pool->tl = to_sparkp;
+
+ PAR_TICKY_MARK_SPARK_QUEUE_END(n);
+
+#if defined(PARALLEL_HASKELL)
+ IF_DEBUG(scheduler,
+ debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
+ n, pruned_sparks, mytid));
+#else
+ IF_DEBUG(scheduler,
+ debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks\n",
+ n, pruned_sparks));
+#endif
+
+ IF_DEBUG(scheduler,
+ debugBelch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)\n",
+ sparkPoolSize(pool), pool->hd, pool->tl));
+
+ }
+}
+
+/* -----------------------------------------------------------------------------
+ *
+ * Turn a spark into a real thread
+ *
+ * -------------------------------------------------------------------------- */
+
+void
+createSparkThread (Capability *cap, StgClosure *p)
+{
+ StgTSO *tso;
+
+ tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
+ appendToRunQueue(cap,tso);
+}
+
+/* -----------------------------------------------------------------------------
+ *
+ * Create a new spark
+ *
+ * -------------------------------------------------------------------------- */
+
+#define DISCARD_NEW
+
+StgInt
+newSpark (StgRegTable *reg, StgClosure *p)
+{
+ StgSparkPool *pool = &(reg->rSparks);
+
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+ if (closure_SHOULD_SPARK(p)) {
+#ifdef DISCARD_NEW
+ StgClosure **new_tl;
+ new_tl = pool->tl + 1;
+ if (new_tl == pool->lim) { new_tl = pool->base; }
+ if (new_tl != pool->hd) {
+ *pool->tl = p;
+ pool->tl = new_tl;
+ } else if (!closure_SHOULD_SPARK(*pool->hd)) {
+ // if the old closure is not sparkable, discard it and
+ // keep the new one. Otherwise, keep the old one.
+ *pool->tl = p;
+ bump_hd(pool);
+ }
+#else /* DISCARD OLD */
+ *pool->tl = p;
+ bump_tl(pool);
+ if (pool->tl == pool->hd) { bump_hd(pool); }
+#endif
+ }
+
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
+ return 1;
+}
+
+#endif /* PARALLEL_HASKELL || SMP */
+
+/* -----------------------------------------------------------------------------
+ *
+ * GRAN & PARALLEL_HASKELL stuff beyond here.
+ *
+ * -------------------------------------------------------------------------- */
+
+#if defined(PARALLEL_HASKELL) || defined(GRAN)
+
+static void slide_spark_pool( StgSparkPool *pool );
+
rtsBool
add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
{
@@ -131,7 +242,7 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
pool->tl < pool->lim) {
*(pool->tl++) = closure;
-#if defined(PAR)
+#if defined(PARALLEL_HASKELL)
// collect parallel global statistics (currently done together with GC stats)
if (RtsFlags.ParFlags.ParStats.Global &&
RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
@@ -141,7 +252,7 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool )
#endif
return rtsTrue;
} else {
-#if defined(PAR)
+#if defined(PARALLEL_HASKELL)
// collect parallel global statistics (currently done together with GC stats)
if (RtsFlags.ParFlags.ParStats.Global &&
RtsFlags.GcFlags.giveStats > NO_GC_STATS) {
@@ -175,88 +286,6 @@ slide_spark_pool( StgSparkPool *pool )
pool->tl = to_sparkp;
}
-nat
-spark_queue_len( StgSparkPool *pool )
-{
- return (nat) (pool->tl - pool->hd);
-}
-
-/* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
- implicit slide i.e. after marking all sparks are at the beginning of the
- spark pool and the spark pool only contains sparkable closures
-*/
-void
-markSparkQueue( void )
-{
- StgClosure **sparkp, **to_sparkp;
- nat n, pruned_sparks; // stats only
- StgSparkPool *pool;
- Capability *cap;
-
- PAR_TICKY_MARK_SPARK_QUEUE_START();
-
-#ifdef SMP
- /* walk over the capabilities, allocating a spark pool for each one */
- for (cap = free_capabilities; cap != NULL; cap = cap->link) {
-#else
- /* allocate a single spark pool */
- cap = &MainRegTable;
- {
-#endif
- pool = &(cap->rSparks);
-
-#if defined(PAR)
- // stats only
- n = 0;
- pruned_sparks = 0;
-#endif
-
- sparkp = pool->hd;
- to_sparkp = pool->base;
- while (sparkp < pool->tl) {
- ASSERT(to_sparkp<=sparkp);
- ASSERT(*sparkp!=NULL);
- ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info));
- // ToDo?: statistics gathering here (also for GUM!)
- if (closure_SHOULD_SPARK(*sparkp)) {
- *to_sparkp = MarkRoot(*sparkp);
- to_sparkp++;
-#ifdef PAR
- n++;
-#endif
- } else {
-#ifdef PAR
- pruned_sparks++;
-#endif
- }
- sparkp++;
- }
- pool->hd = pool->base;
- pool->tl = to_sparkp;
-
- PAR_TICKY_MARK_SPARK_QUEUE_END(n);
-
-#if defined(SMP)
- IF_DEBUG(scheduler,
- debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
- n, pruned_sparks, pthread_self()));
-#elif defined(PAR)
- IF_DEBUG(scheduler,
- debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]",
- n, pruned_sparks, mytid));
-#else
- IF_DEBUG(scheduler,
- debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks",
- n, pruned_sparks));
-#endif
-
- IF_DEBUG(scheduler,
- debugBelch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)",
- spark_queue_len(pool), pool->hd, pool->tl));
-
- }
-}
-
void
disposeSpark(spark)
StgClosure *spark;
@@ -276,22 +305,11 @@ StgClosure *spark;
#elif defined(GRAN)
-//@node GranSim code, , GUM code, Spark Management Routines
-//@subsection GranSim code
-
-//@menu
-//* Basic interface to sparkq::
-//* Aux fcts::
-//@end menu
-
-//@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code
-//@subsubsection Basic interface to sparkq
/*
Search the spark queue of the proc in event for a spark that's worth
turning into a thread
(was gimme_spark in the old RTS)
*/
-//@cindex findLocalSpark
void
findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
{
@@ -423,7 +441,6 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res)
node pointed to by the spark at some point in the future.
(was munch_spark in the old RTS)
*/
-//@cindex activateSpark
rtsBool
activateSpark (rtsEvent *event, rtsSparkQ spark)
{
@@ -530,7 +547,6 @@ STATIC_INLINE nat IGNORE(nat x) { return (0); };
STATIC_INLINE nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); }
/* NB: size_info and par_info are currently unused (what a shame!) -- HWL */
-//@cindex newSpark
rtsSpark *
newSpark(node,name,gran_info,size_info,par_info,local)
StgClosure *node;
@@ -567,7 +583,6 @@ nat name, gran_info, size_info, par_info, local;
return(newspark);
}
-//@cindex disposeSpark
void
disposeSpark(spark)
rtsSpark *spark;
@@ -576,7 +591,6 @@ rtsSpark *spark;
stgFree(spark);
}
-//@cindex disposeSparkQ
void
disposeSparkQ(spark)
rtsSparkQ spark;
@@ -602,7 +616,6 @@ rtsSparkQ spark;
the queue.
*/
-//@cindex add_to_spark_queue
void
add_to_spark_queue(spark)
rtsSpark *spark;
@@ -709,10 +722,6 @@ rtsSpark *spark;
# endif
}
-//@node Aux fcts, , Basic interface to sparkq, GranSim code
-//@subsubsection Aux fcts
-
-//@cindex spark_queue_len
nat
spark_queue_len(proc)
PEs proc;
@@ -740,7 +749,6 @@ PEs proc;
hd and tl pointers of the spark queue. Returns a pointer to the next
spark in the queue.
*/
-//@cindex delete_from_sparkq
rtsSpark *
delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */
rtsSpark *spark;
@@ -793,7 +801,6 @@ rtsBool dispose_too;
}
/* Mark all nodes pointed to by sparks in the spark queues (for GC) */
-//@cindex markSparkQueue
void
markSparkQueue(void)
{
@@ -813,7 +820,6 @@ markSparkQueue(void)
print_sparkq_stats());
}
-//@cindex print_spark
void
print_spark(spark)
rtsSpark *spark;
@@ -835,7 +841,6 @@ rtsSpark *spark;
}
}
-//@cindex print_sparkq
void
print_sparkq(proc)
PEs proc;
@@ -852,7 +857,6 @@ PEs proc;
/*
Print a statistics of all spark queues.
*/
-//@cindex print_sparkq_stats
void
print_sparkq_stats(void)
{
diff --git a/ghc/rts/Sparks.h b/ghc/rts/Sparks.h
index 3d7687a543..1cc92eba70 100644
--- a/ghc/rts/Sparks.h
+++ b/ghc/rts/Sparks.h
@@ -9,8 +9,31 @@
#ifndef SPARKS_H
#define SPARKS_H
-#if defined(GRAN)
+#if defined(PARALLEL_HASKELL) || defined(SMP)
+StgClosure * findSpark (Capability *cap);
+void initSparkPools (void);
+void markSparkQueue (evac_fn evac);
+void createSparkThread (Capability *cap, StgClosure *p);
+StgInt newSpark (StgRegTable *reg, StgClosure *p);
+
+INLINE_HEADER void discardSparks (StgSparkPool *pool);
+INLINE_HEADER nat sparkPoolSize (StgSparkPool *pool);
+INLINE_HEADER rtsBool emptySparkPool (StgSparkPool *pool);
+
+INLINE_HEADER void discardSparksCap (Capability *cap);
+INLINE_HEADER nat sparkPoolSizeCap (Capability *cap);
+INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap);
+#endif
+
+#if defined(PARALLEL_HASKELL)
+StgTSO *activateSpark (rtsSpark spark) ;
+rtsBool add_to_spark_queue( StgClosure *closure, StgSparkPool *pool );
+void markSparkQueue( void );
+nat spark_queue_len( StgSparkPool *pool );
+void disposeSpark( StgClosure *spark );
+#endif
+#if defined(GRAN)
void findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res);
rtsBool activateSpark (rtsEvent *event, rtsSparkQ spark);
rtsSpark *newSpark(StgClosure *node, nat name, nat gran_info,
@@ -24,19 +47,48 @@ void print_sparkq(PEs proc);
void print_sparkq_stats(void);
nat spark_queue_len(PEs proc);
void markSparkQueue(void);
+#endif
+
+/* -----------------------------------------------------------------------------
+ * PRIVATE below here
+ * -------------------------------------------------------------------------- */
-#elif defined(PAR) || defined(SMP)
+#if defined(PARALLEL_HASKELL) || defined(SMP)
+
+INLINE_HEADER rtsBool
+emptySparkPool (StgSparkPool *pool)
+{
+ return (pool->hd == pool->tl);
+}
+
+INLINE_HEADER rtsBool
+emptySparkPoolCap (Capability *cap)
+{ return emptySparkPool(&cap->r.rSparks); }
+
+INLINE_HEADER nat
+sparkPoolSize (StgSparkPool *pool)
+{
+ if (pool->hd <= pool->tl) {
+ return (pool->hd - pool->tl);
+ } else {
+ return (pool->lim - pool->hd + pool->tl - pool->base);
+ }
+}
+
+INLINE_HEADER nat
+sparkPoolSizeCap (Capability *cap)
+{ return sparkPoolSize(&cap->r.rSparks); }
+
+INLINE_HEADER void
+discardSparks (StgSparkPool *pool)
+{
+ pool->hd = pool->tl;
+}
+
+INLINE_HEADER void
+discardSparksCap (Capability *cap)
+{ return discardSparks(&cap->r.rSparks); }
-StgClosure *findSpark( rtsBool );
-void initSparkPools( void );
-void markSparkQueue( void );
-#if defined(PAR)
-StgTSO *activateSpark (rtsSpark spark) ;
-rtsBool add_to_spark_queue( StgClosure *closure, StgSparkPool *pool );
-void markSparkQueue( void );
-nat spark_queue_len( StgSparkPool *pool );
-void disposeSpark( StgClosure *spark );
-#endif
#endif
diff --git a/ghc/rts/StgCRun.c b/ghc/rts/StgCRun.c
index fc08b50bb3..54ebfe195a 100644
--- a/ghc/rts/StgCRun.c
+++ b/ghc/rts/StgCRun.c
@@ -203,7 +203,8 @@ StgRun(StgFunPtr f, StgRegTable *basereg) {
extern StgRegTable * StgRun(StgFunPtr f, StgRegTable *basereg);
-static void StgRunIsImplementedInAssembler(void)
+void StgRunIsImplementedInAssembler(void);
+void StgRunIsImplementedInAssembler(void)
{
__asm__ volatile (
/*
diff --git a/ghc/rts/StgStartup.cmm b/ghc/rts/StgStartup.cmm
index 3569a390bf..2f2a759c81 100644
--- a/ghc/rts/StgStartup.cmm
+++ b/ghc/rts/StgStartup.cmm
@@ -118,7 +118,7 @@ stg_returnToStackTop
stg_returnToSched
{
SAVE_THREAD_STATE();
- foreign "C" threadPaused(CurrentTSO);
+ foreign "C" threadPaused(MyCapability() "ptr", CurrentTSO);
jump StgReturn;
}
@@ -139,7 +139,7 @@ stg_returnToSchedNotPaused
stg_returnToSchedButFirst
{
SAVE_THREAD_STATE();
- foreign "C" threadPaused(CurrentTSO);
+ foreign "C" threadPaused(MyCapability() "ptr", CurrentTSO);
jump R2;
}
diff --git a/ghc/rts/Updates.cmm b/ghc/rts/Updates.cmm
index 02d182764f..1d2fc5fe0f 100644
--- a/ghc/rts/Updates.cmm
+++ b/ghc/rts/Updates.cmm
@@ -102,6 +102,20 @@ INFO_TABLE_RET( stg_upd_frame,
)
UPD_FRAME_ENTRY_TEMPLATE(,stg_IND_direct_info,%ENTRY_CODE(Sp(0)))
+
+INFO_TABLE_RET( stg_marked_upd_frame,
+ UPD_FRAME_WORDS, UPD_FRAME_BITMAP, UPDATE_FRAME,
+ stg_upd_frame_0_ret,
+ stg_upd_frame_1_ret,
+ stg_upd_frame_2_ret,
+ stg_upd_frame_3_ret,
+ stg_upd_frame_4_ret,
+ stg_upd_frame_5_ret,
+ stg_upd_frame_6_ret,
+ stg_upd_frame_7_ret
+ )
+UPD_FRAME_ENTRY_TEMPLATE(,stg_IND_direct_info,%ENTRY_CODE(Sp(0)))
+
/*-----------------------------------------------------------------------------
Seq frames
diff --git a/ghc/rts/Updates.h b/ghc/rts/Updates.h
index fd48fb8e3d..7b0dc3a3d5 100644
--- a/ghc/rts/Updates.h
+++ b/ghc/rts/Updates.h
@@ -204,16 +204,14 @@ extern void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node);
W_ sz; \
W_ i; \
inf = %GET_STD_INFO(p); \
- if (%INFO_TYPE(inf) == HALF_W_(THUNK_SELECTOR)) { \
- StgThunk_payload(p,0) = 0; \
- } else { \
+ if (%INFO_TYPE(inf) != HALF_W_(THUNK_SELECTOR)) { \
if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE)) { \
if (%INFO_TYPE(inf) == HALF_W_(AP_STACK)) { \
sz = StgAP_STACK_size(p) + BYTES_TO_WDS(SIZEOF_StgAP_STACK_NoHdr); \
} else { \
sz = TO_W_(%INFO_PTRS(inf)) + TO_W_(%INFO_NPTRS(inf)); \
} \
- i = 0; \
+ i = 1; /* skip over indirectee */ \
for: \
if (i < sz) { \
StgThunk_payload(p,i) = 0; \
@@ -232,20 +230,17 @@ DEBUG_FILL_SLOP(StgClosure *p)
switch (inf->type) {
case BLACKHOLE:
+ case THUNK_SELECTOR:
return;
case AP_STACK:
sz = ((StgAP_STACK *)p)->size + sizeofW(StgAP_STACK) - sizeofW(StgHeader);
break;
- case THUNK_SELECTOR:
-#ifdef SMP
- ((StgSelector *)p)->selectee = 0;
-#endif
- return;
default:
sz = inf->layout.payload.ptrs + inf->layout.payload.nptrs;
break;
}
- for (i = 0; i < sz; i++) {
+ // start at one to skip over the indirectee
+ for (i = 1; i < sz; i++) {
((StgThunk *)p)->payload[i] = 0;
}
}