diff options
-rw-r--r-- | ghc/includes/Closures.h | 5 | ||||
-rw-r--r-- | ghc/includes/Regs.h | 32 | ||||
-rw-r--r-- | ghc/includes/RtsFlags.h | 1 | ||||
-rw-r--r-- | ghc/includes/StgMiscClosures.h | 2 | ||||
-rw-r--r-- | ghc/includes/Storage.h | 2 | ||||
-rw-r--r-- | ghc/rts/Capability.c | 5 | ||||
-rw-r--r-- | ghc/rts/GC.c | 226 | ||||
-rw-r--r-- | ghc/rts/Interpreter.c | 2 | ||||
-rw-r--r-- | ghc/rts/RtsFlags.c | 5 | ||||
-rw-r--r-- | ghc/rts/Schedule.c | 321 | ||||
-rw-r--r-- | ghc/rts/Schedule.h | 8 | ||||
-rw-r--r-- | ghc/rts/Sparks.c | 388 | ||||
-rw-r--r-- | ghc/rts/Sparks.h | 76 | ||||
-rw-r--r-- | ghc/rts/StgCRun.c | 3 | ||||
-rw-r--r-- | ghc/rts/StgStartup.cmm | 4 | ||||
-rw-r--r-- | ghc/rts/Updates.cmm | 14 | ||||
-rw-r--r-- | ghc/rts/Updates.h | 15 |
17 files changed, 618 insertions, 491 deletions
diff --git a/ghc/includes/Closures.h b/ghc/includes/Closures.h index f9bfeb4dc1..df3bca32f7 100644 --- a/ghc/includes/Closures.h +++ b/ghc/includes/Closures.h @@ -339,6 +339,11 @@ typedef struct { * - In StgTRecHeader, it might be worthwhile having separate chunks * of read-only and read-write locations. This would save a * new_value field in the read-only locations. + * + * - In StgAtomicallyFrame, we could combine the waiting bit into + * the header (maybe a different info tbl for a waiting transaction). + * This means we can specialise the code for the atomically frame + * (it immediately switches on frame->waiting anyway). */ typedef struct StgTVarWaitQueue_ { diff --git a/ghc/includes/Regs.h b/ghc/includes/Regs.h index f1b8597d60..def36c327d 100644 --- a/ghc/includes/Regs.h +++ b/ghc/includes/Regs.h @@ -24,15 +24,17 @@ #include "gmp.h" // Needs MP_INT definition -/* - * This is the table that holds shadow-locations for all the STG - * registers. The shadow locations are used when: - * - * 1) the particular register isn't mapped to a real machine - * register, probably because there's a shortage of real registers. - * 2) caller-saves registers are saved across a CCall +/* + * Spark pools: used to store pending sparks (SMP & PARALLEL_HASKELL only) + * This is a circular buffer. Invariants: + * - base <= hd < lim + * - base <= tl < lim + * - if hd==tl, then the pool is empty. + * - if hd == tl+1, then the pool is full. + * Adding to the pool is done by assigning to *tl++ (wrapping round as + * necessary). When adding to a full pool, we have the option of + * throwing away either the oldest (hd++) or the most recent (tl--) entry. */ - typedef struct StgSparkPool_ { StgClosure **base; StgClosure **lim; @@ -40,6 +42,12 @@ typedef struct StgSparkPool_ { StgClosure **tl; } StgSparkPool; +#define ASSERT_SPARK_POOL_INVARIANTS(p) \ + ASSERT((p)->base <= (p)->hd); \ + ASSERT((p)->hd < (p)->lim); \ + ASSERT((p)->base <= (p)->tl); \ + ASSERT((p)->tl < (p)->lim); + typedef struct { StgFunPtr stgGCEnter1; StgFunPtr stgGCFun; @@ -64,6 +72,14 @@ typedef union { StgTSOPtr t; } StgUnion; +/* + * This is the table that holds shadow-locations for all the STG + * registers. The shadow locations are used when: + * + * 1) the particular register isn't mapped to a real machine + * register, probably because there's a shortage of real registers. + * 2) caller-saves registers are saved across a CCall + */ typedef struct StgRegTable_ { StgUnion rR1; StgUnion rR2; diff --git a/ghc/includes/RtsFlags.h b/ghc/includes/RtsFlags.h index a4c6d9ba58..4a37d48fd4 100644 --- a/ghc/includes/RtsFlags.h +++ b/ghc/includes/RtsFlags.h @@ -62,6 +62,7 @@ struct DEBUG_FLAGS { rtsBool linker; /* 'l' the object linker */ rtsBool apply; /* 'a' */ rtsBool stm; /* 'm' */ + rtsBool squeeze; /* 'z' stack squeezing & lazy blackholing */ }; struct COST_CENTRE_FLAGS { diff --git a/ghc/includes/StgMiscClosures.h b/ghc/includes/StgMiscClosures.h index 1d381e656e..a9938aa9f4 100644 --- a/ghc/includes/StgMiscClosures.h +++ b/ghc/includes/StgMiscClosures.h @@ -37,6 +37,7 @@ /* Stack frames */ RTS_RET_INFO(stg_upd_frame_info); +RTS_RET_INFO(stg_marked_upd_frame_info); RTS_RET_INFO(stg_noupd_frame_info); RTS_RET_INFO(stg_seq_frame_info); RTS_RET_INFO(stg_catch_frame_info); @@ -45,6 +46,7 @@ RTS_RET_INFO(stg_atomically_frame_info); RTS_RET_INFO(stg_catch_stm_frame_info); RTS_ENTRY(stg_upd_frame_ret); +RTS_ENTRY(stg_marked_upd_frame_ret); RTS_ENTRY(stg_seq_frame_ret); /* Entry code for constructors created by the bytecode interpreter */ diff --git a/ghc/includes/Storage.h b/ghc/includes/Storage.h index 1f6ef3f5e7..e37c50d054 100644 --- a/ghc/includes/Storage.h +++ b/ghc/includes/Storage.h @@ -392,7 +392,7 @@ extern lnat countNurseryBlocks ( void ); Functions from GC.c -------------------------------------------------------------------------- */ -extern void threadPaused ( StgTSO * ); +extern void threadPaused ( Capability *cap, StgTSO * ); extern StgClosure * isAlive ( StgClosure *p ); extern void markCAFs ( evac_fn evac ); diff --git a/ghc/rts/Capability.c b/ghc/rts/Capability.c index d08bf23339..5872f427aa 100644 --- a/ghc/rts/Capability.c +++ b/ghc/rts/Capability.c @@ -23,6 +23,7 @@ #include "OSThreads.h" #include "Capability.h" #include "Schedule.h" +#include "Sparks.h" #if !defined(SMP) Capability MainCapability; // for non-SMP, we have one global capability @@ -74,6 +75,8 @@ anyWorkForMe( Capability *cap, Task *task ) } else { return (cap->run_queue_hd->bound == task); } + } else if (task->tso == NULL && !emptySparkPoolCap(cap)) { + return rtsTrue; } return globalWorkToDo(); } @@ -263,7 +266,7 @@ releaseCapability_ (Capability* cap) // If we have an unbound thread on the run queue, or if there's // anything else to do, give the Capability to a worker thread. - if (!emptyRunQueue(cap) || globalWorkToDo()) { + if (!emptyRunQueue(cap) || !emptySparkPoolCap(cap) || globalWorkToDo()) { if (cap->spare_workers) { giveCapabilityToTask(cap,cap->spare_workers); // The worker Task pops itself from the queue; diff --git a/ghc/rts/GC.c b/ghc/rts/GC.c index 0399d6034e..bc8546a115 100644 --- a/ghc/rts/GC.c +++ b/ghc/rts/GC.c @@ -4250,74 +4250,6 @@ gcCAFs(void) /* ----------------------------------------------------------------------------- - Lazy black holing. - - Whenever a thread returns to the scheduler after possibly doing - some work, we have to run down the stack and black-hole all the - closures referred to by update frames. - -------------------------------------------------------------------------- */ - -static void -threadLazyBlackHole(StgTSO *tso) -{ - StgClosure *frame; - StgRetInfoTable *info; - StgClosure *bh; - StgPtr stack_end; - - stack_end = &tso->stack[tso->stack_size]; - - frame = (StgClosure *)tso->sp; - - while (1) { - info = get_ret_itbl(frame); - - switch (info->i.type) { - - case UPDATE_FRAME: - bh = ((StgUpdateFrame *)frame)->updatee; - - /* if the thunk is already blackholed, it means we've also - * already blackholed the rest of the thunks on this stack, - * so we can stop early. - * - * The blackhole made for a CAF is a CAF_BLACKHOLE, so they - * don't interfere with this optimisation. - */ - if (bh->header.info == &stg_BLACKHOLE_info) { - return; - } - - if (bh->header.info != &stg_CAF_BLACKHOLE_info) { -#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG) - debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh); -#endif -#ifdef PROFILING - // @LDV profiling - // We pretend that bh is now dead. - LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh); -#endif - SET_INFO(bh,&stg_BLACKHOLE_info); - - // We pretend that bh has just been created. - LDV_RECORD_CREATE(bh); - } - - frame = (StgClosure *) ((StgUpdateFrame *)frame + 1); - break; - - case STOP_FRAME: - return; - - // normal stack frames; do nothing except advance the pointer - default: - frame = (StgClosure *)((StgPtr)frame + stack_frame_sizeW(frame)); - } - } -} - - -/* ----------------------------------------------------------------------------- * Stack squeezing * * Code largely pinched from old RTS, then hacked to bits. We also do @@ -4328,12 +4260,11 @@ threadLazyBlackHole(StgTSO *tso) struct stack_gap { StgWord gap_size; struct stack_gap *next_gap; }; static void -threadSqueezeStack(StgTSO *tso) +stackSqueeze(StgTSO *tso, StgPtr bottom) { StgPtr frame; rtsBool prev_was_update_frame; StgClosure *updatee = NULL; - StgPtr bottom; StgRetInfoTable *info; StgWord current_gap_size; struct stack_gap *gap; @@ -4344,8 +4275,6 @@ threadSqueezeStack(StgTSO *tso) // contains two values: the size of the gap, and the distance // to the next gap (or the stack top). - bottom = &(tso->stack[tso->stack_size]); - frame = tso->sp; ASSERT(frame < bottom); @@ -4363,20 +4292,6 @@ threadSqueezeStack(StgTSO *tso) { StgUpdateFrame *upd = (StgUpdateFrame *)frame; - if (upd->updatee->header.info == &stg_BLACKHOLE_info) { - - // found a BLACKHOLE'd update frame; we've been here - // before, in a previous GC, so just break out. - - // Mark the end of the gap, if we're in one. - if (current_gap_size != 0) { - gap = (struct stack_gap *)(frame-sizeofW(StgUpdateFrame)); - } - - frame += sizeofW(StgUpdateFrame); - goto done_traversing; - } - if (prev_was_update_frame) { TICK_UPD_SQUEEZED(); @@ -4409,31 +4324,6 @@ threadSqueezeStack(StgTSO *tso) // single update frame, or the topmost update frame in a series else { - StgClosure *bh = upd->updatee; - - // Do lazy black-holing - if (bh->header.info != &stg_BLACKHOLE_info && - bh->header.info != &stg_CAF_BLACKHOLE_info) { -#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG) - debugBelch("Unexpected lazy BHing required at 0x%04lx",(long)bh); -#endif -#ifdef DEBUG - // zero out the slop so that the sanity checker can tell - // where the next closure is. - DEBUG_FILL_SLOP(bh); -#endif -#ifdef PROFILING - // We pretend that bh is now dead. - // ToDo: is the slop filling the same as DEBUG_FILL_SLOP? - LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh); -#endif - // Todo: maybe use SET_HDR() and remove LDV_RECORD_CREATE()? - SET_INFO(bh,&stg_BLACKHOLE_info); - - // We pretend that bh has just been created. - LDV_RECORD_CREATE(bh); - } - prev_was_update_frame = rtsTrue; updatee = upd->updatee; frame += sizeofW(StgUpdateFrame); @@ -4456,8 +4346,10 @@ threadSqueezeStack(StgTSO *tso) } } -done_traversing: - + if (current_gap_size != 0) { + gap = (struct stack_gap *) (frame - sizeofW(StgUpdateFrame)); + } + // Now we have a stack with gaps in it, and we have to walk down // shoving the stack up to fill in the gaps. A diagram might // help: @@ -4515,12 +4407,110 @@ done_traversing: * turned on. * -------------------------------------------------------------------------- */ void -threadPaused(StgTSO *tso) +threadPaused(Capability *cap, StgTSO *tso) { - if ( RtsFlags.GcFlags.squeezeUpdFrames == rtsTrue ) - threadSqueezeStack(tso); // does black holing too - else - threadLazyBlackHole(tso); + StgClosure *frame; + StgRetInfoTable *info; + StgClosure *bh; + StgPtr stack_end; + nat words_to_squeeze = 0; + nat weight = 0; + nat weight_pending = 0; + rtsBool prev_was_update_frame; + + stack_end = &tso->stack[tso->stack_size]; + + frame = (StgClosure *)tso->sp; + + while (1) { + // If we've already marked this frame, then stop here. + if (frame->header.info == (StgInfoTable *)&stg_marked_upd_frame_info) { + goto end; + } + + info = get_ret_itbl(frame); + + switch (info->i.type) { + + case UPDATE_FRAME: + + SET_INFO(frame, (StgInfoTable *)&stg_marked_upd_frame_info); + + bh = ((StgUpdateFrame *)frame)->updatee; + + if (closure_IND(bh) || bh->header.info == &stg_BLACKHOLE_info) { + IF_DEBUG(squeeze, debugBelch("suspending duplicate work: %d words of stack\n", (StgPtr)frame - tso->sp)); + + // If this closure is already an indirection, then + // suspend the computation up to this point: + suspendComputation(cap,tso,(StgPtr)frame); + + // Now drop the update frame, and arrange to return + // the value to the frame underneath: + tso->sp = (StgPtr)frame + sizeofW(StgUpdateFrame) - 2; + tso->sp[1] = (StgWord)bh; + tso->sp[0] = (W_)&stg_enter_info; + + // And continue with threadPaused; there might be + // yet more computation to suspend. + threadPaused(cap,tso); + return; + } + + if (bh->header.info != &stg_CAF_BLACKHOLE_info) { +#if (!defined(LAZY_BLACKHOLING)) && defined(DEBUG) + debugBelch("Unexpected lazy BHing required at 0x%04lx\n",(long)bh); +#endif + // zero out the slop so that the sanity checker can tell + // where the next closure is. + DEBUG_FILL_SLOP(bh); +#ifdef PROFILING + // @LDV profiling + // We pretend that bh is now dead. + LDV_recordDead_FILL_SLOP_DYNAMIC((StgClosure *)bh); +#endif + SET_INFO(bh,&stg_BLACKHOLE_info); + + // We pretend that bh has just been created. + LDV_RECORD_CREATE(bh); + } + + frame = (StgClosure *) ((StgUpdateFrame *)frame + 1); + if (prev_was_update_frame) { + words_to_squeeze += sizeofW(StgUpdateFrame); + weight += weight_pending; + weight_pending = 0; + } + prev_was_update_frame = rtsTrue; + break; + + case STOP_FRAME: + goto end; + + // normal stack frames; do nothing except advance the pointer + default: + { + nat frame_size = stack_frame_sizeW(frame); + weight_pending += frame_size; + frame = (StgClosure *)((StgPtr)frame + frame_size); + prev_was_update_frame = rtsFalse; + } + } + } + +end: + IF_DEBUG(squeeze, + debugBelch("words_to_squeeze: %d, weight: %d, squeeze: %s\n", + words_to_squeeze, weight, + weight < words_to_squeeze ? "YES" : "NO")); + + // Should we squeeze or not? Arbitrary heuristic: we squeeze if + // the number of words we have to shift down is less than the + // number of stack words we squeeze away by doing so. + if (1 /*RtsFlags.GcFlags.squeezeUpdFrames == rtsTrue && + weight < words_to_squeeze*/) { + stackSqueeze(tso, (StgPtr)frame); + } } /* ----------------------------------------------------------------------------- diff --git a/ghc/rts/Interpreter.c b/ghc/rts/Interpreter.c index f007c4abd9..b31ade08fb 100644 --- a/ghc/rts/Interpreter.c +++ b/ghc/rts/Interpreter.c @@ -57,7 +57,7 @@ #define RETURN_TO_SCHEDULER(todo,retcode) \ SAVE_STACK_POINTERS; \ cap->r.rCurrentTSO->what_next = (todo); \ - threadPaused(cap->r.rCurrentTSO); \ + threadPaused(cap,cap->r.rCurrentTSO); \ cap->r.rRet = (retcode); \ return cap; diff --git a/ghc/rts/RtsFlags.c b/ghc/rts/RtsFlags.c index f086368a16..25d53da1fe 100644 --- a/ghc/rts/RtsFlags.c +++ b/ghc/rts/RtsFlags.c @@ -190,6 +190,7 @@ void initRtsFlagsDefaults(void) RtsFlags.DebugFlags.gran = rtsFalse; RtsFlags.DebugFlags.par = rtsFalse; RtsFlags.DebugFlags.linker = rtsFalse; + RtsFlags.DebugFlags.squeeze = rtsFalse; #endif #if defined(PROFILING) || defined(PAR) @@ -431,6 +432,7 @@ usage_text[] = { " -DP DEBUG: par", " -Dl DEBUG: linker", " -Dm DEBUG: stm", +" -Dz DEBUG: stack squezing", "", #endif /* DEBUG */ #if defined(SMP) @@ -726,6 +728,9 @@ error = rtsTrue; case 'm': RtsFlags.DebugFlags.stm = rtsTrue; break; + case 'z': + RtsFlags.DebugFlags.squeeze = rtsTrue; + break; default: bad_option( rts_argv[arg] ); } diff --git a/ghc/rts/Schedule.c b/ghc/rts/Schedule.c index 299e132548..4891bbf9dd 100644 --- a/ghc/rts/Schedule.c +++ b/ghc/rts/Schedule.c @@ -250,7 +250,7 @@ static void AllRoots(evac_fn evac); static StgTSO *threadStackOverflow(Capability *cap, StgTSO *tso); static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically); + rtsBool stop_at_atomically, StgPtr stop_here); static void deleteThread (Capability *cap, StgTSO *tso); static void deleteRunQueue (Capability *cap); @@ -396,7 +396,7 @@ schedule (Capability *initialCapability, Task *task) #ifdef SMP schedulePushWork(cap,task); -#endif +#endif // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign @@ -415,6 +415,9 @@ schedule (Capability *initialCapability, Task *task) // if (interrupted) { deleteRunQueue(cap); +#if defined(SMP) + discardSparksCap(cap); +#endif if (shutting_down_scheduler) { IF_DEBUG(scheduler, sched_belch("shutting down")); // If we are a worker, just exit. If we're a bound thread @@ -428,23 +431,17 @@ schedule (Capability *initialCapability, Task *task) } } -#if defined(not_yet) && defined(SMP) - // - // Top up the run queue from our spark pool. We try to make the - // number of threads in the run queue equal to the number of - // free capabilities. - // +#if defined(SMP) + // If the run queue is empty, take a spark and turn it into a thread. { - StgClosure *spark; - if (emptyRunQueue()) { - spark = findSpark(rtsFalse); - if (spark == NULL) { - break; /* no more sparks in the pool */ - } else { - createSparkThread(spark); + if (emptyRunQueue(cap)) { + StgClosure *spark; + spark = findSpark(cap); + if (spark != NULL) { IF_DEBUG(scheduler, - sched_belch("==^^ turning spark of closure %p into a thread", + sched_belch("turning spark of closure %p into a thread", (StgClosure *)spark)); + createSparkThread(cap,spark); } } } @@ -739,9 +736,10 @@ schedulePushWork(Capability *cap USED_WHEN_SMP, Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; - // Check whether we have more threads on our run queue that we - // could hand to another Capability. - if (emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) { + // Check whether we have more threads on our run queue, or sparks + // in our pool, that we could hand to another Capability. + if ((emptyRunQueue(cap) || cap->run_queue_hd->link == END_TSO_QUEUE) + && sparkPoolSizeCap(cap) < 2) { return; } @@ -772,31 +770,54 @@ schedulePushWork(Capability *cap USED_WHEN_SMP, if (n_free_caps > 0) { StgTSO *prev, *t, *next; + rtsBool pushed_to_all; + IF_DEBUG(scheduler, sched_belch("excess threads on run queue and %d free capabilities, sharing...", n_free_caps)); - prev = cap->run_queue_hd; - t = prev->link; - prev->link = END_TSO_QUEUE; i = 0; - for (; t != END_TSO_QUEUE; t = next) { - next = t->link; - t->link = END_TSO_QUEUE; - if (t->what_next == ThreadRelocated - || t->bound == task) { // don't move my bound thread - prev->link = t; - prev = t; - } else if (i == n_free_caps) { - i = 0; - // keep one for us - prev->link = t; - prev = t; - } else { - appendToRunQueue(free_caps[i],t); - if (t->bound) { t->bound->cap = free_caps[i]; } - i++; + pushed_to_all = rtsFalse; + + if (cap->run_queue_hd != END_TSO_QUEUE) { + prev = cap->run_queue_hd; + t = prev->link; + prev->link = END_TSO_QUEUE; + for (; t != END_TSO_QUEUE; t = next) { + next = t->link; + t->link = END_TSO_QUEUE; + if (t->what_next == ThreadRelocated + || t->bound == task) { // don't move my bound thread + prev->link = t; + prev = t; + } else if (i == n_free_caps) { + pushed_to_all = rtsTrue; + i = 0; + // keep one for us + prev->link = t; + prev = t; + } else { + appendToRunQueue(free_caps[i],t); + if (t->bound) { t->bound->cap = free_caps[i]; } + i++; + } + } + cap->run_queue_tl = prev; + } + + // If there are some free capabilities that we didn't push any + // threads to, then try to push a spark to each one. + if (!pushed_to_all) { + StgClosure *spark; + // i is the next free capability to push to + for (; i < n_free_caps; i++) { + if (emptySparkPoolCap(free_caps[i])) { + spark = findSpark(cap); + if (spark != NULL) { + IF_DEBUG(scheduler, sched_belch("pushing spark %p to capability %d", spark, free_caps[i]->no)); + newSpark(&(free_caps[i]->r), spark); + } + } } } - cap->run_queue_tl = prev; // release the capabilities for (i = 0; i < n_free_caps; i++) { @@ -812,15 +833,20 @@ schedulePushWork(Capability *cap USED_WHEN_SMP, * Start any pending signal handlers * ------------------------------------------------------------------------- */ +#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS)) static void scheduleStartSignalHandlers(Capability *cap) { -#if defined(RTS_USER_SIGNALS) && (!defined(THREADED_RTS) || defined(mingw32_HOST_OS)) if (signals_pending()) { // safe outside the lock startSignalHandlers(cap); } -#endif } +#else +static void +scheduleStartSignalHandlers(Capability *cap STG_UNUSED) +{ +} +#endif /* ---------------------------------------------------------------------------- * Check for blocked threads that can be woken up. @@ -1926,7 +1952,7 @@ scheduleDoGC( Capability *cap, Task *task USED_WHEN_SMP, rtsBool force_major ) // ATOMICALLY_FRAME, aborting the (nested) // transaction, and saving the stack of any // partially-evaluated thunks on the heap. - raiseAsync_(cap, t, NULL, rtsTrue); + raiseAsync_(cap, t, NULL, rtsTrue, NULL); #ifdef REG_R1 ASSERT(get_itbl((StgClosure *)t->sp)->type == ATOMICALLY_FRAME); @@ -2165,7 +2191,7 @@ suspendThread (StgRegTable *reg) // XXX this might not be necessary --SDM tso->what_next = ThreadRunGHC; - threadPaused(tso); + threadPaused(cap,tso); if(tso->blocked_exceptions == NULL) { tso->why_blocked = BlockedOnCCall; @@ -2660,6 +2686,10 @@ initScheduler(void) initTaskManager(); +#if defined(SMP) || defined(PARALLEL_HASKELL) + initSparkPools(); +#endif + #if defined(SMP) /* * Eagerly start one worker to run each Capability, except for @@ -2679,10 +2709,6 @@ initScheduler(void) } #endif -#if /* defined(SMP) ||*/ defined(PARALLEL_HASKELL) - initSparkPools(); -#endif - RELEASE_LOCK(&sched_mutex); } @@ -2772,7 +2798,7 @@ GetRoots( evac_fn evac ) evac((StgClosure **)&blackhole_queue); -#if defined(PARALLEL_HASKELL) || defined(GRAN) +#if defined(SMP) || defined(PARALLEL_HASKELL) || defined(GRAN) markSparkQueue(evac); #endif @@ -3607,15 +3633,22 @@ checkBlackHoles (Capability *cap) void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception) { - raiseAsync_(cap, tso, exception, rtsFalse); + raiseAsync_(cap, tso, exception, rtsFalse, NULL); +} + +void +suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here) +{ + raiseAsync_(cap, tso, NULL, rtsFalse, stop_here); } static void raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, - rtsBool stop_at_atomically) + rtsBool stop_at_atomically, StgPtr stop_here) { StgRetInfoTable *info; - StgPtr sp; + StgPtr sp, frame; + nat i; // Thread already dead? if (tso->what_next == ThreadComplete || tso->what_next == ThreadKilled) { @@ -3640,8 +3673,8 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, sp[0] = (W_)&stg_dummy_ret_closure; } - while (1) { - nat i; + frame = sp + 1; + while (stop_here == NULL || frame < stop_here) { // 1. Let the top of the stack be the "current closure" // @@ -3661,95 +3694,10 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, // NB: if we pass an ATOMICALLY_FRAME then abort the associated // transaction - - StgPtr frame; - - frame = sp + 1; info = get_ret_itbl((StgClosure *)frame); - - while (info->i.type != UPDATE_FRAME - && (info->i.type != CATCH_FRAME || exception == NULL) - && info->i.type != STOP_FRAME - && (info->i.type != ATOMICALLY_FRAME || stop_at_atomically == rtsFalse)) - { - if (info->i.type == CATCH_RETRY_FRAME || info->i.type == ATOMICALLY_FRAME) { - // IF we find an ATOMICALLY_FRAME then we abort the - // current transaction and propagate the exception. In - // this case (unlike ordinary exceptions) we do not care - // whether the transaction is valid or not because its - // possible validity cannot have caused the exception - // and will not be visible after the abort. - IF_DEBUG(stm, - debugBelch("Found atomically block delivering async exception\n")); - stmAbortTransaction(tso -> trec); - tso -> trec = stmGetEnclosingTRec(tso -> trec); - } - frame += stack_frame_sizeW((StgClosure *)frame); - info = get_ret_itbl((StgClosure *)frame); - } - + switch (info->i.type) { - - case ATOMICALLY_FRAME: - ASSERT(stop_at_atomically); - ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); - stmCondemnTransaction(tso -> trec); -#ifdef REG_R1 - tso->sp = frame; -#else - // R1 is not a register: the return convention for IO in - // this case puts the return value on the stack, so we - // need to set up the stack to return to the atomically - // frame properly... - tso->sp = frame - 2; - tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not? - tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info; -#endif - tso->what_next = ThreadRunGHC; - return; - case CATCH_FRAME: - // If we find a CATCH_FRAME, and we've got an exception to raise, - // then build the THUNK raise(exception), and leave it on - // top of the CATCH_FRAME ready to enter. - // - { -#ifdef PROFILING - StgCatchFrame *cf = (StgCatchFrame *)frame; -#endif - StgThunk *raise; - - // we've got an exception to raise, so let's pass it to the - // handler in this frame. - // - raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE); - TICK_ALLOC_SE_THK(1,0); - SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); - raise->payload[0] = exception; - - // throw away the stack from Sp up to the CATCH_FRAME. - // - sp = frame - 1; - - /* Ensure that async excpetions are blocked now, so we don't get - * a surprise exception before we get around to executing the - * handler. - */ - if (tso->blocked_exceptions == NULL) { - tso->blocked_exceptions = END_TSO_QUEUE; - } - - /* Put the newly-built THUNK on top of the stack, ready to execute - * when the thread restarts. - */ - sp[0] = (W_)raise; - sp[-1] = (W_)&stg_enter_info; - tso->sp = sp-1; - tso->what_next = ThreadRunGHC; - IF_DEBUG(sanity, checkTSO(tso)); - return; - } - case UPDATE_FRAME: { StgAP_STACK * ap; @@ -3780,9 +3728,7 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, printObj((StgClosure *)ap); ); - // Replace the updatee with an indirection - happily - // this will also wake up any threads currently - // waiting on the result. + // Replace the updatee with an indirection // // Warning: if we're in a loop, more than one update frame on // the stack may point to the same object. Be careful not to @@ -3799,20 +3745,104 @@ raiseAsync_(Capability *cap, StgTSO *tso, StgClosure *exception, } sp += sizeofW(StgUpdateFrame) - 1; sp[0] = (W_)ap; // push onto stack + frame = sp + 1; break; } - + case STOP_FRAME: // We've stripped the entire stack, the thread is now dead. tso->what_next = ThreadKilled; tso->sp = frame + sizeofW(StgStopFrame); return; + + case CATCH_FRAME: + // If we find a CATCH_FRAME, and we've got an exception to raise, + // then build the THUNK raise(exception), and leave it on + // top of the CATCH_FRAME ready to enter. + // + { +#ifdef PROFILING + StgCatchFrame *cf = (StgCatchFrame *)frame; +#endif + StgThunk *raise; + + if (exception == NULL) break; + + // we've got an exception to raise, so let's pass it to the + // handler in this frame. + // + raise = (StgThunk *)allocateLocal(cap,sizeofW(StgThunk)+MIN_UPD_SIZE); + TICK_ALLOC_SE_THK(1,0); + SET_HDR(raise,&stg_raise_info,cf->header.prof.ccs); + raise->payload[0] = exception; + + // throw away the stack from Sp up to the CATCH_FRAME. + // + sp = frame - 1; + + /* Ensure that async excpetions are blocked now, so we don't get + * a surprise exception before we get around to executing the + * handler. + */ + if (tso->blocked_exceptions == NULL) { + tso->blocked_exceptions = END_TSO_QUEUE; + } + + /* Put the newly-built THUNK on top of the stack, ready to execute + * when the thread restarts. + */ + sp[0] = (W_)raise; + sp[-1] = (W_)&stg_enter_info; + tso->sp = sp-1; + tso->what_next = ThreadRunGHC; + IF_DEBUG(sanity, checkTSO(tso)); + return; + } + + case ATOMICALLY_FRAME: + if (stop_at_atomically) { + ASSERT(stmGetEnclosingTRec(tso->trec) == NO_TREC); + stmCondemnTransaction(tso -> trec); +#ifdef REG_R1 + tso->sp = frame; +#else + // R1 is not a register: the return convention for IO in + // this case puts the return value on the stack, so we + // need to set up the stack to return to the atomically + // frame properly... + tso->sp = frame - 2; + tso->sp[1] = (StgWord) &stg_NO_FINALIZER_closure; // why not? + tso->sp[0] = (StgWord) &stg_ut_1_0_unreg_info; +#endif + tso->what_next = ThreadRunGHC; + return; + } + // Not stop_at_atomically... fall through and abort the + // transaction. + + case CATCH_RETRY_FRAME: + // IF we find an ATOMICALLY_FRAME then we abort the + // current transaction and propagate the exception. In + // this case (unlike ordinary exceptions) we do not care + // whether the transaction is valid or not because its + // possible validity cannot have caused the exception + // and will not be visible after the abort. + IF_DEBUG(stm, + debugBelch("Found atomically block delivering async exception\n")); + stmAbortTransaction(tso -> trec); + tso -> trec = stmGetEnclosingTRec(tso -> trec); + break; default: - barf("raiseAsync"); + break; } + + // move on to the next stack frame + frame += stack_frame_sizeW((StgClosure *)frame); } - barf("raiseAsync"); + + // if we got here, then we stopped at stop_here + ASSERT(stop_here != NULL); } /* ----------------------------------------------------------------------------- @@ -4156,6 +4186,7 @@ printAllThreads(void) } } + debugBelch("other threads:\n"); for (t = all_threads; t != END_TSO_QUEUE; t = next) { if (t->why_blocked != NotBlocked) { printThreadStatus(t); diff --git a/ghc/rts/Schedule.h b/ghc/rts/Schedule.h index 9335e594e6..1626852997 100644 --- a/ghc/rts/Schedule.h +++ b/ghc/rts/Schedule.h @@ -55,6 +55,14 @@ StgTSO * unblockOne(Capability *cap, StgTSO *tso); */ void raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception); +/* suspendComputation() + * + * A variant of raiseAsync(), this strips the stack of the specified + * thread down to the stop_here point, leaving a current closure on + * top of the stack at [stop_here - 1]. + */ +void suspendComputation(Capability *cap, StgTSO *tso, StgPtr stop_here); + /* raiseExceptionHelper */ StgWord raiseExceptionHelper (StgRegTable *reg, StgTSO *tso, StgClosure *exception); diff --git a/ghc/rts/Sparks.c b/ghc/rts/Sparks.c index 6e638d4106..12af296380 100644 --- a/ghc/rts/Sparks.c +++ b/ghc/rts/Sparks.c @@ -1,24 +1,11 @@ /* --------------------------------------------------------------------------- * - * (c) The GHC Team, 2000 + * (c) The GHC Team, 2000-2005 * - * Sparking support for PAR and SMP versions of the RTS. + * Sparking support for PARALLEL_HASKELL and SMP versions of the RTS. * * -------------------------------------------------------------------------*/ -//@node Spark Management Routines, , , -//@section Spark Management Routines - -//@menu -//* Includes:: -//* GUM code:: -//* GranSim code:: -//@end menu -//*/ - -//@node Includes, GUM code, Spark Management Routines, Spark Management Routines -//@subsection Includes - #include "PosixSource.h" #include "Rts.h" #include "Schedule.h" @@ -27,7 +14,7 @@ #include "RtsFlags.h" #include "RtsUtils.h" #include "ParTicky.h" -# if defined(PAR) +# if defined(PARALLEL_HASKELL) # include "ParallelRts.h" # include "GranSimRts.h" // for GR_... # elif defined(GRAN) @@ -35,92 +22,216 @@ # endif #include "Sparks.h" -#if /*defined(SMP) ||*/ defined(PAR) +#if defined(SMP) || defined(PARALLEL_HASKELL) -//@node GUM code, GranSim code, Includes, Spark Management Routines -//@subsection GUM code +static INLINE_ME void bump_hd (StgSparkPool *p) +{ p->hd++; if (p->hd == p->lim) p->hd = p->base; } -static void slide_spark_pool( StgSparkPool *pool ); +static INLINE_ME void bump_tl (StgSparkPool *p) +{ p->tl++; if (p->tl == p->lim) p->tl = p->base; } -void -initSparkPools( void ) -{ - Capability *cap; - StgSparkPool *pool; +/* ----------------------------------------------------------------------------- + * + * Initialising spark pools. + * + * -------------------------------------------------------------------------- */ -#ifdef SMP - /* walk over the capabilities, allocating a spark pool for each one */ - for (cap = free_capabilities; cap != NULL; cap = cap->link) { -#else - /* allocate a single spark pool */ - cap = &MainRegTable; - { -#endif - pool = &(cap->rSparks); - +static void +initSparkPool(StgSparkPool *pool) +{ pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks - * sizeof(StgClosure *), - "initSparkPools"); + * sizeof(StgClosure *), + "initSparkPools"); pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks; pool->hd = pool->base; pool->tl = pool->base; - } } -/* - We traverse the spark pool until we find the 2nd usable (i.e. non-NF) - spark. Rationale, we don't want to give away the only work a PE has. - ToDo: introduce low- and high-water-marks for load balancing. -*/ -StgClosure * -findSpark( rtsBool for_export ) +void +initSparkPools( void ) { - Capability *cap; - StgSparkPool *pool; - StgClosure *spark, *first=NULL; - rtsBool isIdlePE = EMPTY_RUN_QUEUE(); - #ifdef SMP - /* walk over the capabilities, allocating a spark pool for each one */ - for (cap = free_capabilities; cap != NULL; cap = cap->link) { + /* walk over the capabilities, allocating a spark pool for each one */ + nat i; + for (i = 0; i < n_capabilities; i++) { + initSparkPool(&capabilities[i].r.rSparks); + } #else - /* allocate a single spark pool */ - cap = &MainRegTable; - { + /* allocate a single spark pool */ + initSparkPool(&MainCapability.r.rSparks); #endif - pool = &(cap->rSparks); - while (pool->hd < pool->tl) { - spark = *pool->hd++; - if (closure_SHOULD_SPARK(spark)) { - if (for_export && isIdlePE) { - if (first==NULL) { - first = spark; // keep the first usable spark if PE is idle - } else { - pool->hd--; // found a second spark; keep it in the pool - ASSERT(*pool->hd==spark); +} + +/* ----------------------------------------------------------------------------- + * + * findSpark: find a spark on the current Capability that we can fork + * into a thread. + * + * -------------------------------------------------------------------------- */ + +StgClosure * +findSpark (Capability *cap) +{ + StgSparkPool *pool; + StgClosure *spark; + + pool = &(cap->r.rSparks); + ASSERT_SPARK_POOL_INVARIANTS(pool); + + while (pool->hd != pool->tl) { + spark = *pool->hd; + bump_hd(pool); + if (closure_SHOULD_SPARK(spark)) { +#ifdef GRAN if (RtsFlags.ParFlags.ParStats.Sparks) - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_STEALING, ((StgTSO *)NULL), first, - 0, 0 /* spark_queue_len(ADVISORY_POOL) */); - return first; // and return the *first* spark found - } - } else { - if (RtsFlags.ParFlags.ParStats.Sparks && for_export) - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_STEALING, ((StgTSO *)NULL), spark, - 0, 0 /* spark_queue_len(ADVISORY_POOL) */); - return spark; // return first spark found + DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, + GR_STEALING, ((StgTSO *)NULL), spark, + 0, 0 /* spark_queue_len(ADVISORY_POOL) */); +#endif + return spark; } - } } - slide_spark_pool(pool); - } - return NULL; + // spark pool is now empty + return NULL; } -/* - activateSpark is defined in Schedule.c -*/ +/* ----------------------------------------------------------------------------- + * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an + * implicit slide i.e. after marking all sparks are at the beginning of the + * spark pool and the spark pool only contains sparkable closures + * -------------------------------------------------------------------------- */ + +void +markSparkQueue (evac_fn evac) +{ + StgClosure **sparkp, **to_sparkp; + nat i, n, pruned_sparks; // stats only + StgSparkPool *pool; + Capability *cap; + + PAR_TICKY_MARK_SPARK_QUEUE_START(); + + n = 0; + pruned_sparks = 0; + for (i = 0; i < n_capabilities; i++) { + cap = &capabilities[i]; + pool = &(cap->r.rSparks); + + ASSERT_SPARK_POOL_INVARIANTS(pool); + +#if defined(PARALLEL_HASKELL) + // stats only + n = 0; + pruned_sparks = 0; +#endif + + sparkp = pool->hd; + to_sparkp = pool->hd; + while (sparkp != pool->tl) { + ASSERT(to_sparkp<=sparkp); + ASSERT(*sparkp!=NULL); + ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp))); + // ToDo?: statistics gathering here (also for GUM!) + if (closure_SHOULD_SPARK(*sparkp)) { + evac(sparkp); + *to_sparkp++ = *sparkp; + n++; + } else { + pruned_sparks++; + } + sparkp++; + if (sparkp == pool->lim) { + sparkp = pool->base; + } + } + pool->tl = to_sparkp; + + PAR_TICKY_MARK_SPARK_QUEUE_END(n); + +#if defined(PARALLEL_HASKELL) + IF_DEBUG(scheduler, + debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]", + n, pruned_sparks, mytid)); +#else + IF_DEBUG(scheduler, + debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks\n", + n, pruned_sparks)); +#endif + + IF_DEBUG(scheduler, + debugBelch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)\n", + sparkPoolSize(pool), pool->hd, pool->tl)); + + } +} + +/* ----------------------------------------------------------------------------- + * + * Turn a spark into a real thread + * + * -------------------------------------------------------------------------- */ + +void +createSparkThread (Capability *cap, StgClosure *p) +{ + StgTSO *tso; + + tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p); + appendToRunQueue(cap,tso); +} + +/* ----------------------------------------------------------------------------- + * + * Create a new spark + * + * -------------------------------------------------------------------------- */ + +#define DISCARD_NEW + +StgInt +newSpark (StgRegTable *reg, StgClosure *p) +{ + StgSparkPool *pool = &(reg->rSparks); + + ASSERT_SPARK_POOL_INVARIANTS(pool); + + if (closure_SHOULD_SPARK(p)) { +#ifdef DISCARD_NEW + StgClosure **new_tl; + new_tl = pool->tl + 1; + if (new_tl == pool->lim) { new_tl = pool->base; } + if (new_tl != pool->hd) { + *pool->tl = p; + pool->tl = new_tl; + } else if (!closure_SHOULD_SPARK(*pool->hd)) { + // if the old closure is not sparkable, discard it and + // keep the new one. Otherwise, keep the old one. + *pool->tl = p; + bump_hd(pool); + } +#else /* DISCARD OLD */ + *pool->tl = p; + bump_tl(pool); + if (pool->tl == pool->hd) { bump_hd(pool); } +#endif + } + + ASSERT_SPARK_POOL_INVARIANTS(pool); + return 1; +} + +#endif /* PARALLEL_HASKELL || SMP */ + +/* ----------------------------------------------------------------------------- + * + * GRAN & PARALLEL_HASKELL stuff beyond here. + * + * -------------------------------------------------------------------------- */ + +#if defined(PARALLEL_HASKELL) || defined(GRAN) + +static void slide_spark_pool( StgSparkPool *pool ); + rtsBool add_to_spark_queue( StgClosure *closure, StgSparkPool *pool ) { @@ -131,7 +242,7 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool ) pool->tl < pool->lim) { *(pool->tl++) = closure; -#if defined(PAR) +#if defined(PARALLEL_HASKELL) // collect parallel global statistics (currently done together with GC stats) if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { @@ -141,7 +252,7 @@ add_to_spark_queue( StgClosure *closure, StgSparkPool *pool ) #endif return rtsTrue; } else { -#if defined(PAR) +#if defined(PARALLEL_HASKELL) // collect parallel global statistics (currently done together with GC stats) if (RtsFlags.ParFlags.ParStats.Global && RtsFlags.GcFlags.giveStats > NO_GC_STATS) { @@ -175,88 +286,6 @@ slide_spark_pool( StgSparkPool *pool ) pool->tl = to_sparkp; } -nat -spark_queue_len( StgSparkPool *pool ) -{ - return (nat) (pool->tl - pool->hd); -} - -/* Mark all nodes pointed to by sparks in the spark queues (for GC) Does an - implicit slide i.e. after marking all sparks are at the beginning of the - spark pool and the spark pool only contains sparkable closures -*/ -void -markSparkQueue( void ) -{ - StgClosure **sparkp, **to_sparkp; - nat n, pruned_sparks; // stats only - StgSparkPool *pool; - Capability *cap; - - PAR_TICKY_MARK_SPARK_QUEUE_START(); - -#ifdef SMP - /* walk over the capabilities, allocating a spark pool for each one */ - for (cap = free_capabilities; cap != NULL; cap = cap->link) { -#else - /* allocate a single spark pool */ - cap = &MainRegTable; - { -#endif - pool = &(cap->rSparks); - -#if defined(PAR) - // stats only - n = 0; - pruned_sparks = 0; -#endif - - sparkp = pool->hd; - to_sparkp = pool->base; - while (sparkp < pool->tl) { - ASSERT(to_sparkp<=sparkp); - ASSERT(*sparkp!=NULL); - ASSERT(LOOKS_LIKE_GHC_INFO(((StgClosure *)*sparkp)->header.info)); - // ToDo?: statistics gathering here (also for GUM!) - if (closure_SHOULD_SPARK(*sparkp)) { - *to_sparkp = MarkRoot(*sparkp); - to_sparkp++; -#ifdef PAR - n++; -#endif - } else { -#ifdef PAR - pruned_sparks++; -#endif - } - sparkp++; - } - pool->hd = pool->base; - pool->tl = to_sparkp; - - PAR_TICKY_MARK_SPARK_QUEUE_END(n); - -#if defined(SMP) - IF_DEBUG(scheduler, - debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]", - n, pruned_sparks, pthread_self())); -#elif defined(PAR) - IF_DEBUG(scheduler, - debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks on [%x]", - n, pruned_sparks, mytid)); -#else - IF_DEBUG(scheduler, - debugBelch("markSparkQueue: marked %d sparks and pruned %d sparks", - n, pruned_sparks)); -#endif - - IF_DEBUG(scheduler, - debugBelch("markSparkQueue: new spark queue len=%d; (hd=%p; tl=%p)", - spark_queue_len(pool), pool->hd, pool->tl)); - - } -} - void disposeSpark(spark) StgClosure *spark; @@ -276,22 +305,11 @@ StgClosure *spark; #elif defined(GRAN) -//@node GranSim code, , GUM code, Spark Management Routines -//@subsection GranSim code - -//@menu -//* Basic interface to sparkq:: -//* Aux fcts:: -//@end menu - -//@node Basic interface to sparkq, Aux fcts, GranSim code, GranSim code -//@subsubsection Basic interface to sparkq /* Search the spark queue of the proc in event for a spark that's worth turning into a thread (was gimme_spark in the old RTS) */ -//@cindex findLocalSpark void findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) { @@ -423,7 +441,6 @@ findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res) node pointed to by the spark at some point in the future. (was munch_spark in the old RTS) */ -//@cindex activateSpark rtsBool activateSpark (rtsEvent *event, rtsSparkQ spark) { @@ -530,7 +547,6 @@ STATIC_INLINE nat IGNORE(nat x) { return (0); }; STATIC_INLINE nat RAND(nat x) { return ((random() % MAX_RAND_PRI) + 1); } /* NB: size_info and par_info are currently unused (what a shame!) -- HWL */ -//@cindex newSpark rtsSpark * newSpark(node,name,gran_info,size_info,par_info,local) StgClosure *node; @@ -567,7 +583,6 @@ nat name, gran_info, size_info, par_info, local; return(newspark); } -//@cindex disposeSpark void disposeSpark(spark) rtsSpark *spark; @@ -576,7 +591,6 @@ rtsSpark *spark; stgFree(spark); } -//@cindex disposeSparkQ void disposeSparkQ(spark) rtsSparkQ spark; @@ -602,7 +616,6 @@ rtsSparkQ spark; the queue. */ -//@cindex add_to_spark_queue void add_to_spark_queue(spark) rtsSpark *spark; @@ -709,10 +722,6 @@ rtsSpark *spark; # endif } -//@node Aux fcts, , Basic interface to sparkq, GranSim code -//@subsubsection Aux fcts - -//@cindex spark_queue_len nat spark_queue_len(proc) PEs proc; @@ -740,7 +749,6 @@ PEs proc; hd and tl pointers of the spark queue. Returns a pointer to the next spark in the queue. */ -//@cindex delete_from_sparkq rtsSpark * delete_from_sparkq (spark, p, dispose_too) /* unlink and dispose spark */ rtsSpark *spark; @@ -793,7 +801,6 @@ rtsBool dispose_too; } /* Mark all nodes pointed to by sparks in the spark queues (for GC) */ -//@cindex markSparkQueue void markSparkQueue(void) { @@ -813,7 +820,6 @@ markSparkQueue(void) print_sparkq_stats()); } -//@cindex print_spark void print_spark(spark) rtsSpark *spark; @@ -835,7 +841,6 @@ rtsSpark *spark; } } -//@cindex print_sparkq void print_sparkq(proc) PEs proc; @@ -852,7 +857,6 @@ PEs proc; /* Print a statistics of all spark queues. */ -//@cindex print_sparkq_stats void print_sparkq_stats(void) { diff --git a/ghc/rts/Sparks.h b/ghc/rts/Sparks.h index 3d7687a543..1cc92eba70 100644 --- a/ghc/rts/Sparks.h +++ b/ghc/rts/Sparks.h @@ -9,8 +9,31 @@ #ifndef SPARKS_H #define SPARKS_H -#if defined(GRAN) +#if defined(PARALLEL_HASKELL) || defined(SMP) +StgClosure * findSpark (Capability *cap); +void initSparkPools (void); +void markSparkQueue (evac_fn evac); +void createSparkThread (Capability *cap, StgClosure *p); +StgInt newSpark (StgRegTable *reg, StgClosure *p); + +INLINE_HEADER void discardSparks (StgSparkPool *pool); +INLINE_HEADER nat sparkPoolSize (StgSparkPool *pool); +INLINE_HEADER rtsBool emptySparkPool (StgSparkPool *pool); + +INLINE_HEADER void discardSparksCap (Capability *cap); +INLINE_HEADER nat sparkPoolSizeCap (Capability *cap); +INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap); +#endif + +#if defined(PARALLEL_HASKELL) +StgTSO *activateSpark (rtsSpark spark) ; +rtsBool add_to_spark_queue( StgClosure *closure, StgSparkPool *pool ); +void markSparkQueue( void ); +nat spark_queue_len( StgSparkPool *pool ); +void disposeSpark( StgClosure *spark ); +#endif +#if defined(GRAN) void findLocalSpark (rtsEvent *event, rtsBool *found_res, rtsSparkQ *spark_res); rtsBool activateSpark (rtsEvent *event, rtsSparkQ spark); rtsSpark *newSpark(StgClosure *node, nat name, nat gran_info, @@ -24,19 +47,48 @@ void print_sparkq(PEs proc); void print_sparkq_stats(void); nat spark_queue_len(PEs proc); void markSparkQueue(void); +#endif + +/* ----------------------------------------------------------------------------- + * PRIVATE below here + * -------------------------------------------------------------------------- */ -#elif defined(PAR) || defined(SMP) +#if defined(PARALLEL_HASKELL) || defined(SMP) + +INLINE_HEADER rtsBool +emptySparkPool (StgSparkPool *pool) +{ + return (pool->hd == pool->tl); +} + +INLINE_HEADER rtsBool +emptySparkPoolCap (Capability *cap) +{ return emptySparkPool(&cap->r.rSparks); } + +INLINE_HEADER nat +sparkPoolSize (StgSparkPool *pool) +{ + if (pool->hd <= pool->tl) { + return (pool->hd - pool->tl); + } else { + return (pool->lim - pool->hd + pool->tl - pool->base); + } +} + +INLINE_HEADER nat +sparkPoolSizeCap (Capability *cap) +{ return sparkPoolSize(&cap->r.rSparks); } + +INLINE_HEADER void +discardSparks (StgSparkPool *pool) +{ + pool->hd = pool->tl; +} + +INLINE_HEADER void +discardSparksCap (Capability *cap) +{ return discardSparks(&cap->r.rSparks); } -StgClosure *findSpark( rtsBool ); -void initSparkPools( void ); -void markSparkQueue( void ); -#if defined(PAR) -StgTSO *activateSpark (rtsSpark spark) ; -rtsBool add_to_spark_queue( StgClosure *closure, StgSparkPool *pool ); -void markSparkQueue( void ); -nat spark_queue_len( StgSparkPool *pool ); -void disposeSpark( StgClosure *spark ); -#endif #endif diff --git a/ghc/rts/StgCRun.c b/ghc/rts/StgCRun.c index fc08b50bb3..54ebfe195a 100644 --- a/ghc/rts/StgCRun.c +++ b/ghc/rts/StgCRun.c @@ -203,7 +203,8 @@ StgRun(StgFunPtr f, StgRegTable *basereg) { extern StgRegTable * StgRun(StgFunPtr f, StgRegTable *basereg); -static void StgRunIsImplementedInAssembler(void) +void StgRunIsImplementedInAssembler(void); +void StgRunIsImplementedInAssembler(void) { __asm__ volatile ( /* diff --git a/ghc/rts/StgStartup.cmm b/ghc/rts/StgStartup.cmm index 3569a390bf..2f2a759c81 100644 --- a/ghc/rts/StgStartup.cmm +++ b/ghc/rts/StgStartup.cmm @@ -118,7 +118,7 @@ stg_returnToStackTop stg_returnToSched { SAVE_THREAD_STATE(); - foreign "C" threadPaused(CurrentTSO); + foreign "C" threadPaused(MyCapability() "ptr", CurrentTSO); jump StgReturn; } @@ -139,7 +139,7 @@ stg_returnToSchedNotPaused stg_returnToSchedButFirst { SAVE_THREAD_STATE(); - foreign "C" threadPaused(CurrentTSO); + foreign "C" threadPaused(MyCapability() "ptr", CurrentTSO); jump R2; } diff --git a/ghc/rts/Updates.cmm b/ghc/rts/Updates.cmm index 02d182764f..1d2fc5fe0f 100644 --- a/ghc/rts/Updates.cmm +++ b/ghc/rts/Updates.cmm @@ -102,6 +102,20 @@ INFO_TABLE_RET( stg_upd_frame, ) UPD_FRAME_ENTRY_TEMPLATE(,stg_IND_direct_info,%ENTRY_CODE(Sp(0))) + +INFO_TABLE_RET( stg_marked_upd_frame, + UPD_FRAME_WORDS, UPD_FRAME_BITMAP, UPDATE_FRAME, + stg_upd_frame_0_ret, + stg_upd_frame_1_ret, + stg_upd_frame_2_ret, + stg_upd_frame_3_ret, + stg_upd_frame_4_ret, + stg_upd_frame_5_ret, + stg_upd_frame_6_ret, + stg_upd_frame_7_ret + ) +UPD_FRAME_ENTRY_TEMPLATE(,stg_IND_direct_info,%ENTRY_CODE(Sp(0))) + /*----------------------------------------------------------------------------- Seq frames diff --git a/ghc/rts/Updates.h b/ghc/rts/Updates.h index fd48fb8e3d..7b0dc3a3d5 100644 --- a/ghc/rts/Updates.h +++ b/ghc/rts/Updates.h @@ -204,16 +204,14 @@ extern void awakenBlockedQueue(StgBlockingQueueElement *q, StgClosure *node); W_ sz; \ W_ i; \ inf = %GET_STD_INFO(p); \ - if (%INFO_TYPE(inf) == HALF_W_(THUNK_SELECTOR)) { \ - StgThunk_payload(p,0) = 0; \ - } else { \ + if (%INFO_TYPE(inf) != HALF_W_(THUNK_SELECTOR)) { \ if (%INFO_TYPE(inf) != HALF_W_(BLACKHOLE)) { \ if (%INFO_TYPE(inf) == HALF_W_(AP_STACK)) { \ sz = StgAP_STACK_size(p) + BYTES_TO_WDS(SIZEOF_StgAP_STACK_NoHdr); \ } else { \ sz = TO_W_(%INFO_PTRS(inf)) + TO_W_(%INFO_NPTRS(inf)); \ } \ - i = 0; \ + i = 1; /* skip over indirectee */ \ for: \ if (i < sz) { \ StgThunk_payload(p,i) = 0; \ @@ -232,20 +230,17 @@ DEBUG_FILL_SLOP(StgClosure *p) switch (inf->type) { case BLACKHOLE: + case THUNK_SELECTOR: return; case AP_STACK: sz = ((StgAP_STACK *)p)->size + sizeofW(StgAP_STACK) - sizeofW(StgHeader); break; - case THUNK_SELECTOR: -#ifdef SMP - ((StgSelector *)p)->selectee = 0; -#endif - return; default: sz = inf->layout.payload.ptrs + inf->layout.payload.nptrs; break; } - for (i = 0; i < sz; i++) { + // start at one to skip over the indirectee + for (i = 1; i < sz; i++) { ((StgThunk *)p)->payload[i] = 0; } } |