diff options
author | berthold@mathematik.uni-marburg.de <unknown> | 2008-09-15 13:28:46 +0000 |
---|---|---|
committer | berthold@mathematik.uni-marburg.de <unknown> | 2008-09-15 13:28:46 +0000 |
commit | cf9650f2a1690c04051c716124bb0350adc74ae7 (patch) | |
tree | f2e622b8eea04515fc4a19d24a1ecb57a654b33d /rts | |
parent | 7eeac4d143e9287d7c2e27ba23b84d175df49962 (diff) | |
download | haskell-cf9650f2a1690c04051c716124bb0350adc74ae7.tar.gz |
Work stealing for sparks
Spark stealing support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
Spark pools are per capability, separately allocated and held in the Capability
structure. The implementation uses Double-Ended Queues (deque) and cas-protected
access.
The write end of the queue (position bottom) can only be used with
mutual exclusion, i.e. by exactly one caller at a time.
Multiple readers can steal()/findSpark() from the read end
(position top), and are synchronised without a lock, based on a cas
of the top position. One reader wins, the others return NULL for a
failure.
Work stealing is called when Capabilities find no other work (inside yieldCapability),
and tries all capabilities 0..n-1 twice, unless a theft succeeds.
Inside schedulePushWork, all considered cap.s (those which were idle and could
be grabbed) are woken up. Future versions should wake up capabilities immediately when
putting a new spark in the local pool, from newSpark().
Patch has been re-recorded due to conflicting bugfixes in the sparks.c, also fixing a
(strange) conflict in the scheduler.
Diffstat (limited to 'rts')
-rw-r--r-- | rts/Capability.c | 57 | ||||
-rw-r--r-- | rts/Capability.h | 4 | ||||
-rw-r--r-- | rts/Schedule.c | 110 | ||||
-rw-r--r-- | rts/Sparks.c | 517 | ||||
-rw-r--r-- | rts/Sparks.h | 69 |
5 files changed, 582 insertions, 175 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index 4d5748cec2..516aaa573d 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -54,6 +54,55 @@ globalWorkToDo (void) #endif #if defined(THREADED_RTS) +rtsBool stealWork( Capability *cap) { + /* use the normal Sparks.h interface (internally modified to enable + concurrent stealing) + and immediately turn the spark into a thread when successful + */ + Capability *robbed; + SparkPool *pool; + StgClosurePtr spark; + rtsBool success = rtsFalse; + nat i = 0; + + debugTrace(DEBUG_sched, + "cap %d: Trying to steal work from other capabilities", + cap->no); + + if (n_capabilities == 1) { return rtsFalse; } // makes no sense... + + /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could + start at a random place instead of 0 as well. */ + for ( i=0 ; i < n_capabilities ; i++ ) { + robbed = &capabilities[i]; + if (cap == robbed) // ourselves... + continue; + + if (emptySparkPoolCap(robbed)) // nothing to steal here + continue; + + spark = findSpark(robbed); + + if (spark == NULL && !emptySparkPoolCap(robbed)) { + spark = findSpark(robbed); // lost race in concurrent access, try again + } + if (spark != NULL) { + debugTrace(DEBUG_sched, + "cap %d: Stole a spark from capability %d", + cap->no, robbed->no); + + createSparkThread(cap,spark); + success = rtsTrue; + break; // got one, leave the loop + } + // otherwise: no success, try next one + } + debugTrace(DEBUG_sched, + "Leaving work stealing routine (%s)", + success?"one spark stolen":"thefts did not succeed"); + return success; +} + STATIC_INLINE rtsBool anyWorkForMe( Capability *cap, Task *task ) { @@ -73,9 +122,11 @@ anyWorkForMe( Capability *cap, Task *task ) if (emptyRunQueue(cap)) { return !emptySparkPoolCap(cap) || !emptyWakeupQueue(cap) - || globalWorkToDo(); - } else + || globalWorkToDo() + || stealWork(cap); /* if all false: try to steal work */ + } else { return cap->run_queue_hd->bound == NULL; + } } } #endif @@ -778,7 +829,7 @@ void freeCapability (Capability *cap) { stgFree(cap->mut_lists); #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) - freeSparkPool(&cap->r.rSparks); + freeSparkPool(cap->sparks); #endif } diff --git a/rts/Capability.h b/rts/Capability.h index 70d9ee9e8d..59458951eb 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -23,6 +23,7 @@ #ifndef CAPABILITY_H #define CAPABILITY_H +#include "RtsTypes.h" #include "RtsFlags.h" #include "Task.h" @@ -98,6 +99,9 @@ struct Capability_ { StgTRecChunk *free_trec_chunks; StgTRecHeader *free_trec_headers; nat transaction_tokens; + + SparkPool *sparks; + }; // typedef Capability, defined in RtsAPI.h diff --git a/rts/Schedule.c b/rts/Schedule.c index 626c097a82..09150fd8b5 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -137,17 +137,17 @@ static Capability *schedule (Capability *initialCapability, Task *task); // scheduler clearer. // static void schedulePreLoop (void); -#if defined(THREADED_RTS) -static void schedulePushWork(Capability *cap, Task *task); -#endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); -#if defined(PARALLEL_HASKELL) +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +static void schedulePushWork(Capability *cap, Task *task); static rtsBool scheduleGetRemoteWork(Capability *cap); +#if defined(PARALLEL_HASKELL) static void scheduleSendPendingMessages(void); +#endif static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(Capability *cap, StgTSO *t); @@ -291,13 +291,15 @@ schedule (Capability *initialCapability, Task *task) } else { // Yield the capability to higher-priority tasks if necessary. yieldCapability(&cap, task); + /* inside yieldCapability, attempts to steal work from other + capabilities, unless the capability has own work. + See (REMARK) below. + */ } #endif - -#if defined(THREADED_RTS) - schedulePushWork(cap,task); -#endif + /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */ + // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). @@ -365,21 +367,7 @@ schedule (Capability *initialCapability, Task *task) barf("sched_state: %d", sched_state); } -#if defined(THREADED_RTS) - // If the run queue is empty, take a spark and turn it into a thread. - { - if (emptyRunQueue(cap)) { - StgClosure *spark; - spark = findSpark(cap); - if (spark != NULL) { - debugTrace(DEBUG_sched, - "turning spark of closure %p into a thread", - (StgClosure *)spark); - createSparkThread(cap,spark); - } - } - } -#endif // THREADED_RTS + /* this was the place to activate a spark, now below... */ scheduleStartSignalHandlers(cap); @@ -393,11 +381,19 @@ schedule (Capability *initialCapability, Task *task) scheduleCheckBlockedThreads(cap); -#if defined(PARALLEL_HASKELL) - /* message processing and work distribution goes here */ +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) + /* work distribution in multithreaded and parallel systems + + REMARK: IMHO best location for work-stealing as well. + tests above might yield some new jobs, so no need to steal a + spark in some cases. I believe the yieldCapability.. above + should be moved here. + */ +#if defined(PARALLEL_HASKELL) /* if messages have been buffered... a NOOP in THREADED_RTS */ scheduleSendPendingMessages(); +#endif /* If the run queue is empty,...*/ if (emptyRunQueue(cap)) { @@ -406,6 +402,7 @@ schedule (Capability *initialCapability, Task *task) /* if this did not work, try to steal a spark from someone else */ if (emptyRunQueue(cap)) { +#if defined(PARALLEL_HASKELL) receivedFinish = scheduleGetRemoteWork(cap); continue; // a new round, (hopefully) with new work /* @@ -414,10 +411,20 @@ schedule (Capability *initialCapability, Task *task) b) (blocking) awaits and receives messages in Eden, this is only the blocking receive, as b) in GUM. + + in Threaded-RTS, this does plain nothing. Stealing routine + is inside Capability.c and called from + yieldCapability() at the very beginning, see REMARK. */ +#endif } - } + } else { /* i.e. run queue was (initially) not empty */ + schedulePushWork(cap,task); + /* work pushing, currently relevant only for THREADED_RTS: + (pushes threads, wakes up idle capabilities for stealing) */ + } +#if defined(PARALLEL_HASKELL) /* since we perform a blocking receive and continue otherwise, either we never reach here or we definitely have work! */ // from here: non-empty run queue @@ -430,7 +437,9 @@ schedule (Capability *initialCapability, Task *task) above, waits for messages as well! */ processMessages(cap, &receivedFinish); } -#endif // PARALLEL_HASKELL +#endif // PARALLEL_HASKELL: non-empty run queue! + +#endif /* THREADED_RTS || PARALLEL_HASKELL */ scheduleDetectDeadlock(cap,task); #if defined(THREADED_RTS) @@ -679,11 +688,15 @@ schedulePreLoop(void) * Push work to other Capabilities if we have some. * -------------------------------------------------------------------------- */ -#if defined(THREADED_RTS) +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) static void schedulePushWork(Capability *cap USED_IF_THREADS, Task *task USED_IF_THREADS) { + /* following code not for PARALLEL_HASKELL. I kept the call general, + future GUM versions might use pushing in a distributed setup */ +#if defined(THREADED_RTS) + Capability *free_caps[n_capabilities], *cap0; nat i, n_free_caps; @@ -726,7 +739,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS, StgTSO *prev, *t, *next; rtsBool pushed_to_all; - debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps); + debugTrace(DEBUG_sched, + "cap %d: %s and %d free capabilities, sharing...", + cap->no, + (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)? + "excess threads on run queue":"sparks to share (>=2)", + n_free_caps); i = 0; pushed_to_all = rtsFalse; @@ -760,6 +778,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, cap->run_queue_tl = prev; } +#ifdef SPARK_PUSHING + /* JB I left this code in place, it would work but is not necessary */ + // If there are some free capabilities that we didn't push any // threads to, then try to push a spark to each one. if (!pushed_to_all) { @@ -775,16 +796,23 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } } } +#endif /* SPARK_PUSHING */ // release the capabilities for (i = 0; i < n_free_caps; i++) { task->cap = free_caps[i]; releaseCapability(free_caps[i]); } + // now wake them all up, and they might steal sparks if + // the did not get a thread + prodAllCapabilities(); } task->cap = cap; // reset to point to our Capability. + +#endif /* THREADED_RTS */ + } -#endif +#endif /* THREADED_RTS || PARALLEL_HASKELL */ /* ---------------------------------------------------------------------------- * Start any pending signal handlers @@ -965,7 +993,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task) * ------------------------------------------------------------------------- */ #if defined(PARALLEL_HASKELL) -static StgTSO * +static void scheduleSendPendingMessages(void) { @@ -984,10 +1012,10 @@ scheduleSendPendingMessages(void) #endif /* ---------------------------------------------------------------------------- - * Activate spark threads (PARALLEL_HASKELL only) + * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap) { @@ -1012,14 +1040,14 @@ scheduleActivateSpark(Capability *cap) createSparkThread(cap,spark); // defined in Sparks.c } } -#endif // PARALLEL_HASKELL +#endif // PARALLEL_HASKELL || THREADED_RTS /* ---------------------------------------------------------------------------- * Get work from a remote node (PARALLEL_HASKELL only) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) -static rtsBool +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +static rtsBool /* return value used in PARALLEL_HASKELL only */ scheduleGetRemoteWork(Capability *cap) { #if defined(PARALLEL_HASKELL) @@ -1057,7 +1085,7 @@ scheduleGetRemoteWork(Capability *cap) #endif /* PARALLEL_HASKELL */ } -#endif // PARALLEL_HASKELL +#endif // PARALLEL_HASKELL || THREADED_RTS /* ---------------------------------------------------------------------------- * After running a thread... @@ -1483,6 +1511,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major) performHeapProfile = rtsFalse; } +#ifdef SPARKBALANCE + /* JB + Once we are all together... this would be the place to balance all + spark pools. No concurrent stealing or adding of new sparks can + occur. Should be defined in Sparks.c. */ + balanceSparkPoolsCaps(n_capabilities, capabilities); +#endif + #if defined(THREADED_RTS) // release our stash of capabilities. for (i = 0; i < n_capabilities; i++) { diff --git a/rts/Sparks.c b/rts/Sparks.c index 2e9e61ca62..ac11172a9d 100644 --- a/rts/Sparks.c +++ b/rts/Sparks.c @@ -1,10 +1,39 @@ /* --------------------------------------------------------------------------- * - * (c) The GHC Team, 2000-2006 + * (c) The GHC Team, 2000-2008 * * Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS. * - * -------------------------------------------------------------------------*/ + * The implementation uses Double-Ended Queues with lock-free access + * (thereby often called "deque") as described in + * + * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque. + * SPAA'05, July 2005, Las Vegas, USA. + * ACM 1-58113-986-1/05/0007 + * + * Author: Jost Berthold MSRC 07-09/2008 + * + * The DeQue is held as a circular array with known length. Positions + * of top (read-end) and bottom (write-end) always increase, and the + * array is accessed with indices modulo array-size. While this bears + * the risk of overflow, we assume that (with 64 bit indices), a + * program must run very long to reach that point. + * + * The write end of the queue (position bottom) can only be used with + * mutual exclusion, i.e. by exactly one caller at a time. At this + * end, new items can be enqueued using pushBottom()/newSpark(), and + * removed using popBottom()/reclaimSpark() (the latter implying a cas + * synchronisation with potential concurrent readers for the case of + * just one element). + * + * Multiple readers can steal()/findSpark() from the read end + * (position top), and are synchronised without a lock, based on a cas + * of the top position. One reader wins, the others return NULL for a + * failure. + * + * Both popBottom and steal also return NULL when the queue is empty. + * + -------------------------------------------------------------------------*/ #include "PosixSource.h" #include "Rts.h" @@ -14,22 +43,52 @@ #include "RtsFlags.h" #include "RtsUtils.h" #include "ParTicky.h" -# if defined(PARALLEL_HASKELL) -# include "ParallelRts.h" -# include "GranSimRts.h" // for GR_... -# elif defined(GRAN) -# include "GranSimRts.h" -# endif -#include "Sparks.h" #include "Trace.h" +#include "SMP.h" // for cas + +#include "Sparks.h" + #if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) -static INLINE_ME void bump_hd (StgSparkPool *p) -{ p->hd++; if (p->hd == p->lim) p->hd = p->base; } +/* internal helpers ... */ + +StgWord roundUp2(StgWord val); -static INLINE_ME void bump_tl (StgSparkPool *p) -{ p->tl++; if (p->tl == p->lim) p->tl = p->base; } +StgWord roundUp2(StgWord val) { + StgWord rounded = 1; + + /* StgWord is unsigned anyway, only catch 0 */ + if (val == 0) { + barf("DeQue,roundUp2: invalid size 0 requested"); + } + /* at least 1 bit set, shift up to its place */ + do { + rounded = rounded << 1; + } while (0 != (val = val>>1)); + return rounded; +} + +INLINE_HEADER +rtsBool casTop(StgPtr addr, StgWord old, StgWord new); + +#if !defined(THREADED_RTS) +/* missing def. in non THREADED RTS, and makes no sense anyway... */ +StgWord cas(StgPtr addr,StgWord old,StgWord new); +StgWord cas(StgPtr addr,StgWord old,StgWord new) { + barf("cas: not implemented without multithreading"); + old = new = *addr; /* to avoid gcc warnings */ +} +#endif + +INLINE_HEADER +rtsBool casTop(StgWord* addr, StgWord old, StgWord new) { + StgWord res = cas((StgPtr) addr, old, new); + return ((res == old)); +} + +/* or simply like this */ +#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new))) /* ----------------------------------------------------------------------------- * @@ -37,15 +96,28 @@ static INLINE_ME void bump_tl (StgSparkPool *p) * * -------------------------------------------------------------------------- */ -static void -initSparkPool(StgSparkPool *pool) -{ - pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks - * sizeof(StgClosure *), - "initSparkPools"); - pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks; - pool->hd = pool->base; - pool->tl = pool->base; +/* constructor */ +SparkPool* initPool(StgWord size) { + + StgWord realsize; + SparkPool *q; + + realsize = roundUp2(size); /* to compute modulo as a bitwise & */ + + q = (SparkPool*) stgMallocBytes(sizeof(SparkPool), /* admin fields */ + "newSparkPool"); + q->elements = (StgClosurePtr*) + stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */ + "newSparkPool:data space"); + q->top=0; + q->bottom=0; + q->topBound=0; /* read by writer, updated each time top is read */ + + q->size = realsize; /* power of 2 */ + q->moduloSize = realsize - 1; /* n % size == n & moduloSize */ + + ASSERT_SPARK_POOL_INVARIANTS(q); + return q; } void @@ -55,17 +127,71 @@ initSparkPools( void ) /* walk over the capabilities, allocating a spark pool for each one */ nat i; for (i = 0; i < n_capabilities; i++) { - initSparkPool(&capabilities[i].r.rSparks); + capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks); } #else /* allocate a single spark pool */ - initSparkPool(&MainCapability.r.rSparks); + MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks); #endif } void -freeSparkPool(StgSparkPool *pool) { - stgFree(pool->base); +freeSparkPool(SparkPool *pool) { + /* should not interfere with concurrent findSpark() calls! And + nobody should use the pointer any more. We cross our fingers...*/ + stgFree(pool->elements); + stgFree(pool); +} + +/* reclaimSpark(cap): remove a spark from the write end of the queue. + * Returns the removed spark, and NULL if a race is lost or the pool + * empty. + * + * If only one spark is left in the pool, we synchronise with + * concurrently stealing threads by using cas to modify the top field. + * This routine should NEVER be called by a task which does not own + * the capability. Can this be checked here? + */ +StgClosure* reclaimSpark(Capability *cap) { + SparkPool *deque = cap->sparks; + /* also a bit tricky, has to avoid concurrent steal() calls by + accessing top with cas, when there is only one element left */ + StgWord t, b; + StgClosurePtr* pos; + long currSize; + StgClosurePtr removed; + + ASSERT_SPARK_POOL_INVARIANTS(deque); + + b = deque->bottom; + /* "decrement b as a test, see what happens" */ + deque->bottom = --b; + pos = (deque->elements) + (b & (deque->moduloSize)); + t = deque->top; /* using topBound would give an *upper* bound, we + need a lower bound. We use the real top here, but + can update the topBound value */ + deque->topBound = t; + currSize = b - t; + if (currSize < 0) { /* was empty before decrementing b, set b + consistently and abort */ + deque->bottom = t; + return NULL; + } + removed = *pos; + if (currSize > 0) { /* no danger, still elements in buffer after b-- */ + return removed; + } + /* otherwise, has someone meanwhile stolen the same (last) element? + Check and increment top value to know */ + if ( !(CASTOP(&(deque->top),t,t+1)) ) { + removed = NULL; /* no success, but continue adjusting bottom */ + } + deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */ + deque->topBound = t+1; /* ...and cached top value as well */ + + ASSERT_SPARK_POOL_INVARIANTS(deque); + + return removed; } /* ----------------------------------------------------------------------------- @@ -73,32 +199,80 @@ freeSparkPool(StgSparkPool *pool) { * findSpark: find a spark on the current Capability that we can fork * into a thread. * - * -------------------------------------------------------------------------- */ + * May be called by concurrent threads, which synchronise on top + * variable. Returns a spark, or NULL if pool empty or race lost. + * + -------------------------------------------------------------------------- */ + +StgClosurePtr steal(SparkPool *deque); + +/* steal an element from the read end. Synchronises multiple callers + by failing with NULL return. Returns NULL when deque is empty. */ +StgClosurePtr steal(SparkPool *deque) { + StgClosurePtr* pos; + StgClosurePtr* arraybase; + StgWord sz; + StgClosurePtr stolen; + StgWord b,t; + + ASSERT_SPARK_POOL_INVARIANTS(deque); + + b = deque->bottom; + t = deque->top; + if (b - t <= 0 ) { + return NULL; /* already looks empty, abort */ + } + + /* now access array, see pushBottom() */ + arraybase = deque->elements; + sz = deque->moduloSize; + pos = arraybase + (t & sz); + stolen = *pos; + + /* now decide whether we have won */ + if ( !(CASTOP(&(deque->top),t,t+1)) ) { + /* lost the race, someon else has changed top in the meantime */ + stolen = NULL; + } /* else: OK, top has been incremented by the cas call */ + + + ASSERT_SPARK_POOL_INVARIANTS(deque); + /* return NULL or stolen element */ + return stolen; +} StgClosure * findSpark (Capability *cap) { - StgSparkPool *pool; - StgClosure *spark; + SparkPool *deque = (cap->sparks); + StgClosure *stolen; + + ASSERT_SPARK_POOL_INVARIANTS(deque); + + do { + /* keep trying until good spark found or pool looks empty. + TODO is this a good idea? */ + + stolen = steal(deque); - pool = &(cap->r.rSparks); - ASSERT_SPARK_POOL_INVARIANTS(pool); + } while ( ( !stolen /* nothing stolen*/ + || !closure_SHOULD_SPARK(stolen)) /* spark not OK */ + && !looksEmpty(deque)); /* run empty, give up */ - while (pool->hd != pool->tl) { - spark = *pool->hd; - bump_hd(pool); - if (closure_SHOULD_SPARK(spark)) { -#ifdef GRAN - if (RtsFlags.ParFlags.ParStats.Sparks) - DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC, - GR_STEALING, ((StgTSO *)NULL), spark, - 0, 0 /* spark_queue_len(ADVISORY_POOL) */); -#endif - return spark; - } - } - // spark pool is now empty - return NULL; + /* return stolen element */ + return stolen; +} + + +/* "guesses" whether a deque is empty. Can return false negatives in + presence of concurrent steal() calls, and false positives in + presence of a concurrent pushBottom().*/ +rtsBool looksEmpty(SparkPool* deque) { + StgWord t = deque->top; + StgWord b = deque->bottom; + /* try to prefer false negatives by reading top first */ + return (b - t <= 0); + /* => array is *never* completely filled, always 1 place free! */ } /* ----------------------------------------------------------------------------- @@ -123,11 +297,64 @@ createSparkThread (Capability *cap, StgClosure *p) * -------------------------------------------------------------------------- */ #define DISCARD_NEW +void pushBottom(SparkPool* deque, StgClosurePtr elem); + +/* enqueue an element. Should always succeed by resizing the array + (not implemented yet, silently fails in that case). */ +void pushBottom(SparkPool* deque, StgClosurePtr elem) { + StgWord t; + StgClosurePtr* pos; + StgWord sz = deque->moduloSize; + StgWord b = deque->bottom; + + ASSERT_SPARK_POOL_INVARIANTS(deque); + + /* we try to avoid reading deque->top (accessed by all) and use + deque->topBound (accessed only by writer) instead. + This is why we do not just call empty(deque) here. + */ + t = deque->topBound; + if ( b - t >= sz ) { /* nota bene: sz == deque->size - 1, thus ">=" */ + /* could be full, check the real top value in this case */ + t = deque->top; + deque->topBound = t; + if (b - t >= sz) { /* really no space left :-( */ + /* reallocate the array, copying the values. Concurrent steal()s + will in the meantime use the old one and modify only top. + This means: we cannot safely free the old space! Can keep it + on a free list internally here... + + Potential bug in combination with steal(): if array is + replaced, it is unclear which one concurrent steal operations + use. Must read the array base address in advance in steal(). + */ +#if defined(DISCARD_NEW) + ASSERT_SPARK_POOL_INVARIANTS(deque); + return; /* for now, silently fail */ +#else + /* could make room by incrementing the top position here. In + * this case, should use CASTOP. If this fails, someone else has + * removed something, and new room will be available. + */ + ASSERT_SPARK_POOL_INVARIANTS(deque); +#endif + } + } + pos = (deque->elements) + (b & sz); + *pos = elem; + (deque->bottom)++; + + ASSERT_SPARK_POOL_INVARIANTS(deque); + return; +} + +/* this is called as a direct C-call from Stg => we need to keep the + pool in a register (???) */ StgInt newSpark (StgRegTable *reg, StgClosure *p) { - StgSparkPool *pool = &(reg->rSparks); + SparkPool *pool = (reg->rCurrentTSO->cap->sparks); /* I am not sure whether this is the right thing to do. * Maybe it is better to exploit the tag information @@ -138,82 +365,125 @@ newSpark (StgRegTable *reg, StgClosure *p) ASSERT_SPARK_POOL_INVARIANTS(pool); if (closure_SHOULD_SPARK(p)) { -#ifdef DISCARD_NEW - StgClosure **new_tl; - new_tl = pool->tl + 1; - if (new_tl == pool->lim) { new_tl = pool->base; } - if (new_tl != pool->hd) { - *pool->tl = p; - pool->tl = new_tl; - } else if (!closure_SHOULD_SPARK(*pool->hd)) { - // if the old closure is not sparkable, discard it and - // keep the new one. Otherwise, keep the old one. - *pool->tl = p; - bump_hd(pool); - } -#else /* DISCARD OLD */ - *pool->tl = p; - bump_tl(pool); - if (pool->tl == pool->hd) { bump_hd(pool); } -#endif + pushBottom(pool,p); } ASSERT_SPARK_POOL_INVARIANTS(pool); return 1; } -/* ----------------------------------------------------------------------------- - * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an - * implicit slide i.e. after marking all sparks are at the beginning of the - * spark pool and the spark pool only contains sparkable closures + + +/* -------------------------------------------------------------------------- + * Remove all sparks from the spark queues which should not spark any + * more. Called after GC. We assume exclusive access to the structure + * and replace all sparks in the queue, see explanation below. At exit, + * the spark pool only contains sparkable closures. * -------------------------------------------------------------------------- */ static void pruneSparkQueue (Capability *cap) { - StgClosure *spark, **sparkp, **to_sparkp; + SparkPool *pool; + StgClosurePtr spark, evacspark, *elements; nat n, pruned_sparks; // stats only - StgSparkPool *pool; + StgWord botInd,oldBotInd,currInd; // indices in array (always < size) PAR_TICKY_MARK_SPARK_QUEUE_START(); n = 0; pruned_sparks = 0; - pool = &(cap->r.rSparks); + pool = cap->sparks; + debugTrace(DEBUG_sched, + "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)", + sparkPoolSize(pool), pool->bottom, pool->top); ASSERT_SPARK_POOL_INVARIANTS(pool); - - sparkp = pool->hd; - to_sparkp = pool->hd; - while (sparkp != pool->tl) { - ASSERT(*sparkp!=NULL); - ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp))); - // ToDo?: statistics gathering here (also for GUM!) - spark = *sparkp; - if (!closure_SHOULD_SPARK(spark)) { - pruned_sparks++; - } else{ - *to_sparkp++ = spark; - if (to_sparkp == pool->lim) { - to_sparkp = pool->base; - } - n++; - } - sparkp++; - if (sparkp == pool->lim) { - sparkp = pool->base; - } - } - pool->tl = to_sparkp; - + + elements = pool->elements; + + /* We have exclusive access to the structure here, so we can reset + bottom and top counters, and prune invalid sparks. Contents are + copied in-place if they are valuable, otherwise discarded. The + routine uses "real" indices t and b, starts by computing them + as the modulus size of top and bottom, + + Copying: + + At the beginning, the pool structure can look like this: + ( bottom % size >= top % size , no wrap-around) + t b + ___________***********_________________ + + or like this ( bottom % size < top % size, wrap-around ) + b t + ***********__________****************** + As we need to remove useless sparks anyway, we make one pass + between t and b, moving valuable content to b and subsequent + cells (wrapping around when the size is reached). + + b t + ***********OOO_______XX_X__X?********** + ^____move?____/ + + After this movement, botInd becomes the new bottom, and old + bottom becomes the new top index, both as indices in the array + size range. + */ + // starting here + currInd = (pool->top) & (pool->moduloSize); // mod + + // copies of evacuated closures go to space from botInd on + // we keep oldBotInd to know when to stop + oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod + + // on entry to loop, we are within the bounds + ASSERT( currInd < pool->size && botInd < pool->size ); + + while (currInd != oldBotInd ) { + /* must use != here, wrap-around at size + subtle: loop not entered if queue empty + */ + + /* check element at currInd. if valuable, evacuate and move to + botInd, otherwise move on */ + spark = elements[currInd]; + + /* if valuable work: shift inside the pool */ + if ( closure_SHOULD_SPARK(spark) ) { + elements[botInd] = spark; // keep entry (new address) + botInd++; + n++; + } else { + pruned_sparks++; // discard spark + } + currInd++; + + // in the loop, we may reach the bounds, and instantly wrap around + ASSERT( currInd <= pool->size && botInd <= pool->size ); + if ( currInd == pool->size ) { currInd = 0; } + if ( botInd == pool->size ) { botInd = 0; } + + } // while-loop over spark pool elements + + ASSERT(currInd == oldBotInd); + + pool->top = oldBotInd; // where we started writing + pool->topBound = pool->top; + + pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); + // first free place we did not use (corrected by wraparound) + PAR_TICKY_MARK_SPARK_QUEUE_END(n); - + debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks); debugTrace(DEBUG_sched, - "new spark queue len=%d; (hd=%p; tl=%p)", - sparkPoolSize(pool), pool->hd, pool->tl); + "new spark queue len=%d; (hd=%ld; tl=%ld)", + sparkPoolSize(pool), pool->bottom, pool->top); + + ASSERT_SPARK_POOL_INVARIANTS(pool); } void @@ -225,21 +495,50 @@ pruneSparkQueues (void) } } +/* GC for the spark pool, called inside Capability.c for all + capabilities in turn. Blindly "evac"s complete spark pool. */ void traverseSparkQueue (evac_fn evac, void *user, Capability *cap) { StgClosure **sparkp; - StgSparkPool *pool; + SparkPool *pool; + StgWord top,bottom, modMask; - pool = &(cap->r.rSparks); - sparkp = pool->hd; - while (sparkp != pool->tl) { - evac(user, sparkp); - sparkp++; - if (sparkp == pool->lim) { - sparkp = pool->base; - } + pool = cap->sparks; + + ASSERT_SPARK_POOL_INVARIANTS(pool); + + top = pool->top; + bottom = pool->bottom; + sparkp = pool->elements; + modMask = pool->moduloSize; + + while (top < bottom) { + /* call evac for all closures in range (wrap-around via modulo) + * In GHC-6.10, evac takes an additional 1st argument to hold a + * GC-specific register, see rts/sm/GC.c::mark_root() + */ + evac( user , sparkp + (top & modMask) ); + top++; } + + debugTrace(DEBUG_sched, + "traversed spark queue, len=%d; (hd=%ld; tl=%ld)", + sparkPoolSize(pool), pool->bottom, pool->top); +} + +/* ---------------------------------------------------------------------------- + + * balanceSparkPoolsCaps: takes an array of capabilities (usually: all + * capabilities) and its size. Accesses all spark pools and equally + * distributes the sparks among them. + * + * Could be called after GC, before Cap. release, from scheduler. + * -------------------------------------------------------------------------- */ +void balanceSparkPoolsCaps(nat n_caps, Capability caps[]); + +void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) { + barf("not implemented"); } #else @@ -259,6 +558,8 @@ newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED) * * GRAN & PARALLEL_HASKELL stuff beyond here. * + * TODO "nuke" this! + * * -------------------------------------------------------------------------- */ #if defined(PARALLEL_HASKELL) || defined(GRAN) diff --git a/rts/Sparks.h b/rts/Sparks.h index 8e0ba90f3d..dbbf268988 100644 --- a/rts/Sparks.h +++ b/rts/Sparks.h @@ -9,17 +9,45 @@ #ifndef SPARKS_H #define SPARKS_H +#if defined(PARALLEL_HASKELL) +#error Sparks.c using new internal structure, needs major overhaul! +#endif + +/* typedef for SparkPool in RtsTypes.h */ + #if defined(THREADED_RTS) + +/* INVARIANTS, in this order: bottom/top consistent, reasonable size, + topBound consistent, space pointer, space accessible to us */ +#define ASSERT_SPARK_POOL_INVARIANTS(p) \ + ASSERT((p)->bottom >= (p)->top); \ + ASSERT((p)->size > 0); \ + ASSERT((p)->size > (p)->bottom - (p)->top); \ + ASSERT((p)->topBound <= (p)->top); \ + ASSERT((p)->elements != NULL); \ + ASSERT(*((p)->elements) || 1); \ + ASSERT(*((p)->elements - 1 + ((p)->size)) || 1); + +// missing in old interface. Currently called by initSparkPools +// internally. +SparkPool* initPool(StgWord size); + +// special case: accessing our own pool, at the write end +// otherwise, we can always steal from our pool as the others do... +StgClosure* reclaimSpark(Capability *cap); + +rtsBool looksEmpty(SparkPool* deque); + +// rest: same as old interface StgClosure * findSpark (Capability *cap); void initSparkPools (void); -void freeSparkPool (StgSparkPool *pool); +void freeSparkPool (SparkPool *pool); void createSparkThread (Capability *cap, StgClosure *p); void pruneSparkQueues (void); void traverseSparkQueue(evac_fn evac, void *user, Capability *cap); -INLINE_HEADER void discardSparks (StgSparkPool *pool); -INLINE_HEADER nat sparkPoolSize (StgSparkPool *pool); -INLINE_HEADER rtsBool emptySparkPool (StgSparkPool *pool); +INLINE_HEADER void discardSparks (SparkPool *pool); +INLINE_HEADER nat sparkPoolSize (SparkPool *pool); INLINE_HEADER void discardSparksCap (Capability *cap); INLINE_HEADER nat sparkPoolSizeCap (Capability *cap); @@ -32,46 +60,33 @@ INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap); #if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) -INLINE_HEADER rtsBool -emptySparkPool (StgSparkPool *pool) -{ - return (pool->hd == pool->tl); -} +INLINE_HEADER rtsBool +emptySparkPool (SparkPool *pool) +{ return looksEmpty(pool); } INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap) -{ return emptySparkPool(&cap->r.rSparks); } +{ return looksEmpty(cap->sparks); } INLINE_HEADER nat -sparkPoolSize (StgSparkPool *pool) +sparkPoolSize (SparkPool *pool) { - if (pool->hd <= pool->tl) { - return (pool->tl - pool->hd); - } else { - return (pool->lim - pool->hd + pool->tl - pool->base); - } + return (pool->bottom - pool->top); } INLINE_HEADER nat sparkPoolSizeCap (Capability *cap) -{ return sparkPoolSize(&cap->r.rSparks); } +{ return sparkPoolSize(cap->sparks); } INLINE_HEADER void -discardSparks (StgSparkPool *pool) +discardSparks (SparkPool *pool) { - pool->hd = pool->tl; + pool->top = pool->bottom = 0; } INLINE_HEADER void discardSparksCap (Capability *cap) -{ return discardSparks(&cap->r.rSparks); } - - -#elif defined(THREADED_RTS) - -INLINE_HEADER rtsBool -emptySparkPoolCap (Capability *cap STG_UNUSED) -{ return rtsTrue; } +{ return discardSparks(cap->sparks); } #endif |