summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorberthold@mathematik.uni-marburg.de <unknown>2008-09-15 13:28:46 +0000
committerberthold@mathematik.uni-marburg.de <unknown>2008-09-15 13:28:46 +0000
commitcf9650f2a1690c04051c716124bb0350adc74ae7 (patch)
treef2e622b8eea04515fc4a19d24a1ecb57a654b33d /rts
parent7eeac4d143e9287d7c2e27ba23b84d175df49962 (diff)
downloadhaskell-cf9650f2a1690c04051c716124bb0350adc74ae7.tar.gz
Work stealing for sparks
Spark stealing support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS. Spark pools are per capability, separately allocated and held in the Capability structure. The implementation uses Double-Ended Queues (deque) and cas-protected access. The write end of the queue (position bottom) can only be used with mutual exclusion, i.e. by exactly one caller at a time. Multiple readers can steal()/findSpark() from the read end (position top), and are synchronised without a lock, based on a cas of the top position. One reader wins, the others return NULL for a failure. Work stealing is called when Capabilities find no other work (inside yieldCapability), and tries all capabilities 0..n-1 twice, unless a theft succeeds. Inside schedulePushWork, all considered cap.s (those which were idle and could be grabbed) are woken up. Future versions should wake up capabilities immediately when putting a new spark in the local pool, from newSpark(). Patch has been re-recorded due to conflicting bugfixes in the sparks.c, also fixing a (strange) conflict in the scheduler.
Diffstat (limited to 'rts')
-rw-r--r--rts/Capability.c57
-rw-r--r--rts/Capability.h4
-rw-r--r--rts/Schedule.c110
-rw-r--r--rts/Sparks.c517
-rw-r--r--rts/Sparks.h69
5 files changed, 582 insertions, 175 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 4d5748cec2..516aaa573d 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -54,6 +54,55 @@ globalWorkToDo (void)
#endif
#if defined(THREADED_RTS)
+rtsBool stealWork( Capability *cap) {
+ /* use the normal Sparks.h interface (internally modified to enable
+ concurrent stealing)
+ and immediately turn the spark into a thread when successful
+ */
+ Capability *robbed;
+ SparkPool *pool;
+ StgClosurePtr spark;
+ rtsBool success = rtsFalse;
+ nat i = 0;
+
+ debugTrace(DEBUG_sched,
+ "cap %d: Trying to steal work from other capabilities",
+ cap->no);
+
+ if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
+
+ /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+ start at a random place instead of 0 as well. */
+ for ( i=0 ; i < n_capabilities ; i++ ) {
+ robbed = &capabilities[i];
+ if (cap == robbed) // ourselves...
+ continue;
+
+ if (emptySparkPoolCap(robbed)) // nothing to steal here
+ continue;
+
+ spark = findSpark(robbed);
+
+ if (spark == NULL && !emptySparkPoolCap(robbed)) {
+ spark = findSpark(robbed); // lost race in concurrent access, try again
+ }
+ if (spark != NULL) {
+ debugTrace(DEBUG_sched,
+ "cap %d: Stole a spark from capability %d",
+ cap->no, robbed->no);
+
+ createSparkThread(cap,spark);
+ success = rtsTrue;
+ break; // got one, leave the loop
+ }
+ // otherwise: no success, try next one
+ }
+ debugTrace(DEBUG_sched,
+ "Leaving work stealing routine (%s)",
+ success?"one spark stolen":"thefts did not succeed");
+ return success;
+}
+
STATIC_INLINE rtsBool
anyWorkForMe( Capability *cap, Task *task )
{
@@ -73,9 +122,11 @@ anyWorkForMe( Capability *cap, Task *task )
if (emptyRunQueue(cap)) {
return !emptySparkPoolCap(cap)
|| !emptyWakeupQueue(cap)
- || globalWorkToDo();
- } else
+ || globalWorkToDo()
+ || stealWork(cap); /* if all false: try to steal work */
+ } else {
return cap->run_queue_hd->bound == NULL;
+ }
}
}
#endif
@@ -778,7 +829,7 @@ void
freeCapability (Capability *cap) {
stgFree(cap->mut_lists);
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
- freeSparkPool(&cap->r.rSparks);
+ freeSparkPool(cap->sparks);
#endif
}
diff --git a/rts/Capability.h b/rts/Capability.h
index 70d9ee9e8d..59458951eb 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -23,6 +23,7 @@
#ifndef CAPABILITY_H
#define CAPABILITY_H
+#include "RtsTypes.h"
#include "RtsFlags.h"
#include "Task.h"
@@ -98,6 +99,9 @@ struct Capability_ {
StgTRecChunk *free_trec_chunks;
StgTRecHeader *free_trec_headers;
nat transaction_tokens;
+
+ SparkPool *sparks;
+
}; // typedef Capability, defined in RtsAPI.h
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 626c097a82..09150fd8b5 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -137,17 +137,17 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
-#if defined(THREADED_RTS)
-static void schedulePushWork(Capability *cap, Task *task);
-#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static void schedulePushWork(Capability *cap, Task *task);
static rtsBool scheduleGetRemoteWork(Capability *cap);
+#if defined(PARALLEL_HASKELL)
static void scheduleSendPendingMessages(void);
+#endif
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
@@ -291,13 +291,15 @@ schedule (Capability *initialCapability, Task *task)
} else {
// Yield the capability to higher-priority tasks if necessary.
yieldCapability(&cap, task);
+ /* inside yieldCapability, attempts to steal work from other
+ capabilities, unless the capability has own work.
+ See (REMARK) below.
+ */
}
#endif
-
-#if defined(THREADED_RTS)
- schedulePushWork(cap,task);
-#endif
+ /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
+
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
@@ -365,21 +367,7 @@ schedule (Capability *initialCapability, Task *task)
barf("sched_state: %d", sched_state);
}
-#if defined(THREADED_RTS)
- // If the run queue is empty, take a spark and turn it into a thread.
- {
- if (emptyRunQueue(cap)) {
- StgClosure *spark;
- spark = findSpark(cap);
- if (spark != NULL) {
- debugTrace(DEBUG_sched,
- "turning spark of closure %p into a thread",
- (StgClosure *)spark);
- createSparkThread(cap,spark);
- }
- }
- }
-#endif // THREADED_RTS
+ /* this was the place to activate a spark, now below... */
scheduleStartSignalHandlers(cap);
@@ -393,11 +381,19 @@ schedule (Capability *initialCapability, Task *task)
scheduleCheckBlockedThreads(cap);
-#if defined(PARALLEL_HASKELL)
- /* message processing and work distribution goes here */
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+ /* work distribution in multithreaded and parallel systems
+
+ REMARK: IMHO best location for work-stealing as well.
+ tests above might yield some new jobs, so no need to steal a
+ spark in some cases. I believe the yieldCapability.. above
+ should be moved here.
+ */
+#if defined(PARALLEL_HASKELL)
/* if messages have been buffered... a NOOP in THREADED_RTS */
scheduleSendPendingMessages();
+#endif
/* If the run queue is empty,...*/
if (emptyRunQueue(cap)) {
@@ -406,6 +402,7 @@ schedule (Capability *initialCapability, Task *task)
/* if this did not work, try to steal a spark from someone else */
if (emptyRunQueue(cap)) {
+#if defined(PARALLEL_HASKELL)
receivedFinish = scheduleGetRemoteWork(cap);
continue; // a new round, (hopefully) with new work
/*
@@ -414,10 +411,20 @@ schedule (Capability *initialCapability, Task *task)
b) (blocking) awaits and receives messages
in Eden, this is only the blocking receive, as b) in GUM.
+
+ in Threaded-RTS, this does plain nothing. Stealing routine
+ is inside Capability.c and called from
+ yieldCapability() at the very beginning, see REMARK.
*/
+#endif
}
- }
+ } else { /* i.e. run queue was (initially) not empty */
+ schedulePushWork(cap,task);
+ /* work pushing, currently relevant only for THREADED_RTS:
+ (pushes threads, wakes up idle capabilities for stealing) */
+ }
+#if defined(PARALLEL_HASKELL)
/* since we perform a blocking receive and continue otherwise,
either we never reach here or we definitely have work! */
// from here: non-empty run queue
@@ -430,7 +437,9 @@ schedule (Capability *initialCapability, Task *task)
above, waits for messages as well! */
processMessages(cap, &receivedFinish);
}
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL: non-empty run queue!
+
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
scheduleDetectDeadlock(cap,task);
#if defined(THREADED_RTS)
@@ -679,11 +688,15 @@ schedulePreLoop(void)
* Push work to other Capabilities if we have some.
* -------------------------------------------------------------------------- */
-#if defined(THREADED_RTS)
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
static void
schedulePushWork(Capability *cap USED_IF_THREADS,
Task *task USED_IF_THREADS)
{
+ /* following code not for PARALLEL_HASKELL. I kept the call general,
+ future GUM versions might use pushing in a distributed setup */
+#if defined(THREADED_RTS)
+
Capability *free_caps[n_capabilities], *cap0;
nat i, n_free_caps;
@@ -726,7 +739,12 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
StgTSO *prev, *t, *next;
rtsBool pushed_to_all;
- debugTrace(DEBUG_sched, "excess threads on run queue and %d free capabilities, sharing...", n_free_caps);
+ debugTrace(DEBUG_sched,
+ "cap %d: %s and %d free capabilities, sharing...",
+ cap->no,
+ (!emptyRunQueue(cap) && cap->run_queue_hd->_link != END_TSO_QUEUE)?
+ "excess threads on run queue":"sparks to share (>=2)",
+ n_free_caps);
i = 0;
pushed_to_all = rtsFalse;
@@ -760,6 +778,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
cap->run_queue_tl = prev;
}
+#ifdef SPARK_PUSHING
+ /* JB I left this code in place, it would work but is not necessary */
+
// If there are some free capabilities that we didn't push any
// threads to, then try to push a spark to each one.
if (!pushed_to_all) {
@@ -775,16 +796,23 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
}
}
}
+#endif /* SPARK_PUSHING */
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
task->cap = free_caps[i];
releaseCapability(free_caps[i]);
}
+ // now wake them all up, and they might steal sparks if
+ // the did not get a thread
+ prodAllCapabilities();
}
task->cap = cap; // reset to point to our Capability.
+
+#endif /* THREADED_RTS */
+
}
-#endif
+#endif /* THREADED_RTS || PARALLEL_HASKELL */
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
@@ -965,7 +993,7 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
* ------------------------------------------------------------------------- */
#if defined(PARALLEL_HASKELL)
-static StgTSO *
+static void
scheduleSendPendingMessages(void)
{
@@ -984,10 +1012,10 @@ scheduleSendPendingMessages(void)
#endif
/* ----------------------------------------------------------------------------
- * Activate spark threads (PARALLEL_HASKELL only)
+ * Activate spark threads (PARALLEL_HASKELL and THREADED_RTS)
* ------------------------------------------------------------------------- */
-#if defined(PARALLEL_HASKELL)
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void
scheduleActivateSpark(Capability *cap)
{
@@ -1012,14 +1040,14 @@ scheduleActivateSpark(Capability *cap)
createSparkThread(cap,spark); // defined in Sparks.c
}
}
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
/* ----------------------------------------------------------------------------
* Get work from a remote node (PARALLEL_HASKELL only)
* ------------------------------------------------------------------------- */
-#if defined(PARALLEL_HASKELL)
-static rtsBool
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+static rtsBool /* return value used in PARALLEL_HASKELL only */
scheduleGetRemoteWork(Capability *cap)
{
#if defined(PARALLEL_HASKELL)
@@ -1057,7 +1085,7 @@ scheduleGetRemoteWork(Capability *cap)
#endif /* PARALLEL_HASKELL */
}
-#endif // PARALLEL_HASKELL
+#endif // PARALLEL_HASKELL || THREADED_RTS
/* ----------------------------------------------------------------------------
* After running a thread...
@@ -1483,6 +1511,14 @@ scheduleDoGC (Capability *cap, Task *task USED_IF_THREADS, rtsBool force_major)
performHeapProfile = rtsFalse;
}
+#ifdef SPARKBALANCE
+ /* JB
+ Once we are all together... this would be the place to balance all
+ spark pools. No concurrent stealing or adding of new sparks can
+ occur. Should be defined in Sparks.c. */
+ balanceSparkPoolsCaps(n_capabilities, capabilities);
+#endif
+
#if defined(THREADED_RTS)
// release our stash of capabilities.
for (i = 0; i < n_capabilities; i++) {
diff --git a/rts/Sparks.c b/rts/Sparks.c
index 2e9e61ca62..ac11172a9d 100644
--- a/rts/Sparks.c
+++ b/rts/Sparks.c
@@ -1,10 +1,39 @@
/* ---------------------------------------------------------------------------
*
- * (c) The GHC Team, 2000-2006
+ * (c) The GHC Team, 2000-2008
*
* Sparking support for PARALLEL_HASKELL and THREADED_RTS versions of the RTS.
*
- * -------------------------------------------------------------------------*/
+ * The implementation uses Double-Ended Queues with lock-free access
+ * (thereby often called "deque") as described in
+ *
+ * D.Chase and Y.Lev, Dynamic Circular Work-Stealing Deque.
+ * SPAA'05, July 2005, Las Vegas, USA.
+ * ACM 1-58113-986-1/05/0007
+ *
+ * Author: Jost Berthold MSRC 07-09/2008
+ *
+ * The DeQue is held as a circular array with known length. Positions
+ * of top (read-end) and bottom (write-end) always increase, and the
+ * array is accessed with indices modulo array-size. While this bears
+ * the risk of overflow, we assume that (with 64 bit indices), a
+ * program must run very long to reach that point.
+ *
+ * The write end of the queue (position bottom) can only be used with
+ * mutual exclusion, i.e. by exactly one caller at a time. At this
+ * end, new items can be enqueued using pushBottom()/newSpark(), and
+ * removed using popBottom()/reclaimSpark() (the latter implying a cas
+ * synchronisation with potential concurrent readers for the case of
+ * just one element).
+ *
+ * Multiple readers can steal()/findSpark() from the read end
+ * (position top), and are synchronised without a lock, based on a cas
+ * of the top position. One reader wins, the others return NULL for a
+ * failure.
+ *
+ * Both popBottom and steal also return NULL when the queue is empty.
+ *
+ -------------------------------------------------------------------------*/
#include "PosixSource.h"
#include "Rts.h"
@@ -14,22 +43,52 @@
#include "RtsFlags.h"
#include "RtsUtils.h"
#include "ParTicky.h"
-# if defined(PARALLEL_HASKELL)
-# include "ParallelRts.h"
-# include "GranSimRts.h" // for GR_...
-# elif defined(GRAN)
-# include "GranSimRts.h"
-# endif
-#include "Sparks.h"
#include "Trace.h"
+#include "SMP.h" // for cas
+
+#include "Sparks.h"
+
#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
-static INLINE_ME void bump_hd (StgSparkPool *p)
-{ p->hd++; if (p->hd == p->lim) p->hd = p->base; }
+/* internal helpers ... */
+
+StgWord roundUp2(StgWord val);
-static INLINE_ME void bump_tl (StgSparkPool *p)
-{ p->tl++; if (p->tl == p->lim) p->tl = p->base; }
+StgWord roundUp2(StgWord val) {
+ StgWord rounded = 1;
+
+ /* StgWord is unsigned anyway, only catch 0 */
+ if (val == 0) {
+ barf("DeQue,roundUp2: invalid size 0 requested");
+ }
+ /* at least 1 bit set, shift up to its place */
+ do {
+ rounded = rounded << 1;
+ } while (0 != (val = val>>1));
+ return rounded;
+}
+
+INLINE_HEADER
+rtsBool casTop(StgPtr addr, StgWord old, StgWord new);
+
+#if !defined(THREADED_RTS)
+/* missing def. in non THREADED RTS, and makes no sense anyway... */
+StgWord cas(StgPtr addr,StgWord old,StgWord new);
+StgWord cas(StgPtr addr,StgWord old,StgWord new) {
+ barf("cas: not implemented without multithreading");
+ old = new = *addr; /* to avoid gcc warnings */
+}
+#endif
+
+INLINE_HEADER
+rtsBool casTop(StgWord* addr, StgWord old, StgWord new) {
+ StgWord res = cas((StgPtr) addr, old, new);
+ return ((res == old));
+}
+
+/* or simply like this */
+#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
/* -----------------------------------------------------------------------------
*
@@ -37,15 +96,28 @@ static INLINE_ME void bump_tl (StgSparkPool *p)
*
* -------------------------------------------------------------------------- */
-static void
-initSparkPool(StgSparkPool *pool)
-{
- pool->base = stgMallocBytes(RtsFlags.ParFlags.maxLocalSparks
- * sizeof(StgClosure *),
- "initSparkPools");
- pool->lim = pool->base + RtsFlags.ParFlags.maxLocalSparks;
- pool->hd = pool->base;
- pool->tl = pool->base;
+/* constructor */
+SparkPool* initPool(StgWord size) {
+
+ StgWord realsize;
+ SparkPool *q;
+
+ realsize = roundUp2(size); /* to compute modulo as a bitwise & */
+
+ q = (SparkPool*) stgMallocBytes(sizeof(SparkPool), /* admin fields */
+ "newSparkPool");
+ q->elements = (StgClosurePtr*)
+ stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
+ "newSparkPool:data space");
+ q->top=0;
+ q->bottom=0;
+ q->topBound=0; /* read by writer, updated each time top is read */
+
+ q->size = realsize; /* power of 2 */
+ q->moduloSize = realsize - 1; /* n % size == n & moduloSize */
+
+ ASSERT_SPARK_POOL_INVARIANTS(q);
+ return q;
}
void
@@ -55,17 +127,71 @@ initSparkPools( void )
/* walk over the capabilities, allocating a spark pool for each one */
nat i;
for (i = 0; i < n_capabilities; i++) {
- initSparkPool(&capabilities[i].r.rSparks);
+ capabilities[i].sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
}
#else
/* allocate a single spark pool */
- initSparkPool(&MainCapability.r.rSparks);
+ MainCapability->sparks = initPool(RtsFlags.ParFlags.maxLocalSparks);
#endif
}
void
-freeSparkPool(StgSparkPool *pool) {
- stgFree(pool->base);
+freeSparkPool(SparkPool *pool) {
+ /* should not interfere with concurrent findSpark() calls! And
+ nobody should use the pointer any more. We cross our fingers...*/
+ stgFree(pool->elements);
+ stgFree(pool);
+}
+
+/* reclaimSpark(cap): remove a spark from the write end of the queue.
+ * Returns the removed spark, and NULL if a race is lost or the pool
+ * empty.
+ *
+ * If only one spark is left in the pool, we synchronise with
+ * concurrently stealing threads by using cas to modify the top field.
+ * This routine should NEVER be called by a task which does not own
+ * the capability. Can this be checked here?
+ */
+StgClosure* reclaimSpark(Capability *cap) {
+ SparkPool *deque = cap->sparks;
+ /* also a bit tricky, has to avoid concurrent steal() calls by
+ accessing top with cas, when there is only one element left */
+ StgWord t, b;
+ StgClosurePtr* pos;
+ long currSize;
+ StgClosurePtr removed;
+
+ ASSERT_SPARK_POOL_INVARIANTS(deque);
+
+ b = deque->bottom;
+ /* "decrement b as a test, see what happens" */
+ deque->bottom = --b;
+ pos = (deque->elements) + (b & (deque->moduloSize));
+ t = deque->top; /* using topBound would give an *upper* bound, we
+ need a lower bound. We use the real top here, but
+ can update the topBound value */
+ deque->topBound = t;
+ currSize = b - t;
+ if (currSize < 0) { /* was empty before decrementing b, set b
+ consistently and abort */
+ deque->bottom = t;
+ return NULL;
+ }
+ removed = *pos;
+ if (currSize > 0) { /* no danger, still elements in buffer after b-- */
+ return removed;
+ }
+ /* otherwise, has someone meanwhile stolen the same (last) element?
+ Check and increment top value to know */
+ if ( !(CASTOP(&(deque->top),t,t+1)) ) {
+ removed = NULL; /* no success, but continue adjusting bottom */
+ }
+ deque->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
+ deque->topBound = t+1; /* ...and cached top value as well */
+
+ ASSERT_SPARK_POOL_INVARIANTS(deque);
+
+ return removed;
}
/* -----------------------------------------------------------------------------
@@ -73,32 +199,80 @@ freeSparkPool(StgSparkPool *pool) {
* findSpark: find a spark on the current Capability that we can fork
* into a thread.
*
- * -------------------------------------------------------------------------- */
+ * May be called by concurrent threads, which synchronise on top
+ * variable. Returns a spark, or NULL if pool empty or race lost.
+ *
+ -------------------------------------------------------------------------- */
+
+StgClosurePtr steal(SparkPool *deque);
+
+/* steal an element from the read end. Synchronises multiple callers
+ by failing with NULL return. Returns NULL when deque is empty. */
+StgClosurePtr steal(SparkPool *deque) {
+ StgClosurePtr* pos;
+ StgClosurePtr* arraybase;
+ StgWord sz;
+ StgClosurePtr stolen;
+ StgWord b,t;
+
+ ASSERT_SPARK_POOL_INVARIANTS(deque);
+
+ b = deque->bottom;
+ t = deque->top;
+ if (b - t <= 0 ) {
+ return NULL; /* already looks empty, abort */
+ }
+
+ /* now access array, see pushBottom() */
+ arraybase = deque->elements;
+ sz = deque->moduloSize;
+ pos = arraybase + (t & sz);
+ stolen = *pos;
+
+ /* now decide whether we have won */
+ if ( !(CASTOP(&(deque->top),t,t+1)) ) {
+ /* lost the race, someon else has changed top in the meantime */
+ stolen = NULL;
+ } /* else: OK, top has been incremented by the cas call */
+
+
+ ASSERT_SPARK_POOL_INVARIANTS(deque);
+ /* return NULL or stolen element */
+ return stolen;
+}
StgClosure *
findSpark (Capability *cap)
{
- StgSparkPool *pool;
- StgClosure *spark;
+ SparkPool *deque = (cap->sparks);
+ StgClosure *stolen;
+
+ ASSERT_SPARK_POOL_INVARIANTS(deque);
+
+ do {
+ /* keep trying until good spark found or pool looks empty.
+ TODO is this a good idea? */
+
+ stolen = steal(deque);
- pool = &(cap->r.rSparks);
- ASSERT_SPARK_POOL_INVARIANTS(pool);
+ } while ( ( !stolen /* nothing stolen*/
+ || !closure_SHOULD_SPARK(stolen)) /* spark not OK */
+ && !looksEmpty(deque)); /* run empty, give up */
- while (pool->hd != pool->tl) {
- spark = *pool->hd;
- bump_hd(pool);
- if (closure_SHOULD_SPARK(spark)) {
-#ifdef GRAN
- if (RtsFlags.ParFlags.ParStats.Sparks)
- DumpRawGranEvent(CURRENT_PROC, CURRENT_PROC,
- GR_STEALING, ((StgTSO *)NULL), spark,
- 0, 0 /* spark_queue_len(ADVISORY_POOL) */);
-#endif
- return spark;
- }
- }
- // spark pool is now empty
- return NULL;
+ /* return stolen element */
+ return stolen;
+}
+
+
+/* "guesses" whether a deque is empty. Can return false negatives in
+ presence of concurrent steal() calls, and false positives in
+ presence of a concurrent pushBottom().*/
+rtsBool looksEmpty(SparkPool* deque) {
+ StgWord t = deque->top;
+ StgWord b = deque->bottom;
+ /* try to prefer false negatives by reading top first */
+ return (b - t <= 0);
+ /* => array is *never* completely filled, always 1 place free! */
}
/* -----------------------------------------------------------------------------
@@ -123,11 +297,64 @@ createSparkThread (Capability *cap, StgClosure *p)
* -------------------------------------------------------------------------- */
#define DISCARD_NEW
+void pushBottom(SparkPool* deque, StgClosurePtr elem);
+
+/* enqueue an element. Should always succeed by resizing the array
+ (not implemented yet, silently fails in that case). */
+void pushBottom(SparkPool* deque, StgClosurePtr elem) {
+ StgWord t;
+ StgClosurePtr* pos;
+ StgWord sz = deque->moduloSize;
+ StgWord b = deque->bottom;
+
+ ASSERT_SPARK_POOL_INVARIANTS(deque);
+
+ /* we try to avoid reading deque->top (accessed by all) and use
+ deque->topBound (accessed only by writer) instead.
+ This is why we do not just call empty(deque) here.
+ */
+ t = deque->topBound;
+ if ( b - t >= sz ) { /* nota bene: sz == deque->size - 1, thus ">=" */
+ /* could be full, check the real top value in this case */
+ t = deque->top;
+ deque->topBound = t;
+ if (b - t >= sz) { /* really no space left :-( */
+ /* reallocate the array, copying the values. Concurrent steal()s
+ will in the meantime use the old one and modify only top.
+ This means: we cannot safely free the old space! Can keep it
+ on a free list internally here...
+
+ Potential bug in combination with steal(): if array is
+ replaced, it is unclear which one concurrent steal operations
+ use. Must read the array base address in advance in steal().
+ */
+#if defined(DISCARD_NEW)
+ ASSERT_SPARK_POOL_INVARIANTS(deque);
+ return; /* for now, silently fail */
+#else
+ /* could make room by incrementing the top position here. In
+ * this case, should use CASTOP. If this fails, someone else has
+ * removed something, and new room will be available.
+ */
+ ASSERT_SPARK_POOL_INVARIANTS(deque);
+#endif
+ }
+ }
+ pos = (deque->elements) + (b & sz);
+ *pos = elem;
+ (deque->bottom)++;
+
+ ASSERT_SPARK_POOL_INVARIANTS(deque);
+ return;
+}
+
+/* this is called as a direct C-call from Stg => we need to keep the
+ pool in a register (???) */
StgInt
newSpark (StgRegTable *reg, StgClosure *p)
{
- StgSparkPool *pool = &(reg->rSparks);
+ SparkPool *pool = (reg->rCurrentTSO->cap->sparks);
/* I am not sure whether this is the right thing to do.
* Maybe it is better to exploit the tag information
@@ -138,82 +365,125 @@ newSpark (StgRegTable *reg, StgClosure *p)
ASSERT_SPARK_POOL_INVARIANTS(pool);
if (closure_SHOULD_SPARK(p)) {
-#ifdef DISCARD_NEW
- StgClosure **new_tl;
- new_tl = pool->tl + 1;
- if (new_tl == pool->lim) { new_tl = pool->base; }
- if (new_tl != pool->hd) {
- *pool->tl = p;
- pool->tl = new_tl;
- } else if (!closure_SHOULD_SPARK(*pool->hd)) {
- // if the old closure is not sparkable, discard it and
- // keep the new one. Otherwise, keep the old one.
- *pool->tl = p;
- bump_hd(pool);
- }
-#else /* DISCARD OLD */
- *pool->tl = p;
- bump_tl(pool);
- if (pool->tl == pool->hd) { bump_hd(pool); }
-#endif
+ pushBottom(pool,p);
}
ASSERT_SPARK_POOL_INVARIANTS(pool);
return 1;
}
-/* -----------------------------------------------------------------------------
- * Mark all nodes pointed to by sparks in the spark queues (for GC) Does an
- * implicit slide i.e. after marking all sparks are at the beginning of the
- * spark pool and the spark pool only contains sparkable closures
+
+
+/* --------------------------------------------------------------------------
+ * Remove all sparks from the spark queues which should not spark any
+ * more. Called after GC. We assume exclusive access to the structure
+ * and replace all sparks in the queue, see explanation below. At exit,
+ * the spark pool only contains sparkable closures.
* -------------------------------------------------------------------------- */
static void
pruneSparkQueue (Capability *cap)
{
- StgClosure *spark, **sparkp, **to_sparkp;
+ SparkPool *pool;
+ StgClosurePtr spark, evacspark, *elements;
nat n, pruned_sparks; // stats only
- StgSparkPool *pool;
+ StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
PAR_TICKY_MARK_SPARK_QUEUE_START();
n = 0;
pruned_sparks = 0;
- pool = &(cap->r.rSparks);
+ pool = cap->sparks;
+ debugTrace(DEBUG_sched,
+ "markSparkQueue: current spark queue len=%d; (hd=%ld; tl=%ld)",
+ sparkPoolSize(pool), pool->bottom, pool->top);
ASSERT_SPARK_POOL_INVARIANTS(pool);
-
- sparkp = pool->hd;
- to_sparkp = pool->hd;
- while (sparkp != pool->tl) {
- ASSERT(*sparkp!=NULL);
- ASSERT(LOOKS_LIKE_CLOSURE_PTR(((StgClosure *)*sparkp)));
- // ToDo?: statistics gathering here (also for GUM!)
- spark = *sparkp;
- if (!closure_SHOULD_SPARK(spark)) {
- pruned_sparks++;
- } else{
- *to_sparkp++ = spark;
- if (to_sparkp == pool->lim) {
- to_sparkp = pool->base;
- }
- n++;
- }
- sparkp++;
- if (sparkp == pool->lim) {
- sparkp = pool->base;
- }
- }
- pool->tl = to_sparkp;
-
+
+ elements = pool->elements;
+
+ /* We have exclusive access to the structure here, so we can reset
+ bottom and top counters, and prune invalid sparks. Contents are
+ copied in-place if they are valuable, otherwise discarded. The
+ routine uses "real" indices t and b, starts by computing them
+ as the modulus size of top and bottom,
+
+ Copying:
+
+ At the beginning, the pool structure can look like this:
+ ( bottom % size >= top % size , no wrap-around)
+ t b
+ ___________***********_________________
+
+ or like this ( bottom % size < top % size, wrap-around )
+ b t
+ ***********__________******************
+ As we need to remove useless sparks anyway, we make one pass
+ between t and b, moving valuable content to b and subsequent
+ cells (wrapping around when the size is reached).
+
+ b t
+ ***********OOO_______XX_X__X?**********
+ ^____move?____/
+
+ After this movement, botInd becomes the new bottom, and old
+ bottom becomes the new top index, both as indices in the array
+ size range.
+ */
+ // starting here
+ currInd = (pool->top) & (pool->moduloSize); // mod
+
+ // copies of evacuated closures go to space from botInd on
+ // we keep oldBotInd to know when to stop
+ oldBotInd = botInd = (pool->bottom) & (pool->moduloSize); // mod
+
+ // on entry to loop, we are within the bounds
+ ASSERT( currInd < pool->size && botInd < pool->size );
+
+ while (currInd != oldBotInd ) {
+ /* must use != here, wrap-around at size
+ subtle: loop not entered if queue empty
+ */
+
+ /* check element at currInd. if valuable, evacuate and move to
+ botInd, otherwise move on */
+ spark = elements[currInd];
+
+ /* if valuable work: shift inside the pool */
+ if ( closure_SHOULD_SPARK(spark) ) {
+ elements[botInd] = spark; // keep entry (new address)
+ botInd++;
+ n++;
+ } else {
+ pruned_sparks++; // discard spark
+ }
+ currInd++;
+
+ // in the loop, we may reach the bounds, and instantly wrap around
+ ASSERT( currInd <= pool->size && botInd <= pool->size );
+ if ( currInd == pool->size ) { currInd = 0; }
+ if ( botInd == pool->size ) { botInd = 0; }
+
+ } // while-loop over spark pool elements
+
+ ASSERT(currInd == oldBotInd);
+
+ pool->top = oldBotInd; // where we started writing
+ pool->topBound = pool->top;
+
+ pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size);
+ // first free place we did not use (corrected by wraparound)
+
PAR_TICKY_MARK_SPARK_QUEUE_END(n);
-
+
debugTrace(DEBUG_sched, "pruned %d sparks", pruned_sparks);
debugTrace(DEBUG_sched,
- "new spark queue len=%d; (hd=%p; tl=%p)",
- sparkPoolSize(pool), pool->hd, pool->tl);
+ "new spark queue len=%d; (hd=%ld; tl=%ld)",
+ sparkPoolSize(pool), pool->bottom, pool->top);
+
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
}
void
@@ -225,21 +495,50 @@ pruneSparkQueues (void)
}
}
+/* GC for the spark pool, called inside Capability.c for all
+ capabilities in turn. Blindly "evac"s complete spark pool. */
void
traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
{
StgClosure **sparkp;
- StgSparkPool *pool;
+ SparkPool *pool;
+ StgWord top,bottom, modMask;
- pool = &(cap->r.rSparks);
- sparkp = pool->hd;
- while (sparkp != pool->tl) {
- evac(user, sparkp);
- sparkp++;
- if (sparkp == pool->lim) {
- sparkp = pool->base;
- }
+ pool = cap->sparks;
+
+ ASSERT_SPARK_POOL_INVARIANTS(pool);
+
+ top = pool->top;
+ bottom = pool->bottom;
+ sparkp = pool->elements;
+ modMask = pool->moduloSize;
+
+ while (top < bottom) {
+ /* call evac for all closures in range (wrap-around via modulo)
+ * In GHC-6.10, evac takes an additional 1st argument to hold a
+ * GC-specific register, see rts/sm/GC.c::mark_root()
+ */
+ evac( user , sparkp + (top & modMask) );
+ top++;
}
+
+ debugTrace(DEBUG_sched,
+ "traversed spark queue, len=%d; (hd=%ld; tl=%ld)",
+ sparkPoolSize(pool), pool->bottom, pool->top);
+}
+
+/* ----------------------------------------------------------------------------
+
+ * balanceSparkPoolsCaps: takes an array of capabilities (usually: all
+ * capabilities) and its size. Accesses all spark pools and equally
+ * distributes the sparks among them.
+ *
+ * Could be called after GC, before Cap. release, from scheduler.
+ * -------------------------------------------------------------------------- */
+void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
+
+void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) {
+ barf("not implemented");
}
#else
@@ -259,6 +558,8 @@ newSpark (StgRegTable *reg STG_UNUSED, StgClosure *p STG_UNUSED)
*
* GRAN & PARALLEL_HASKELL stuff beyond here.
*
+ * TODO "nuke" this!
+ *
* -------------------------------------------------------------------------- */
#if defined(PARALLEL_HASKELL) || defined(GRAN)
diff --git a/rts/Sparks.h b/rts/Sparks.h
index 8e0ba90f3d..dbbf268988 100644
--- a/rts/Sparks.h
+++ b/rts/Sparks.h
@@ -9,17 +9,45 @@
#ifndef SPARKS_H
#define SPARKS_H
+#if defined(PARALLEL_HASKELL)
+#error Sparks.c using new internal structure, needs major overhaul!
+#endif
+
+/* typedef for SparkPool in RtsTypes.h */
+
#if defined(THREADED_RTS)
+
+/* INVARIANTS, in this order: bottom/top consistent, reasonable size,
+ topBound consistent, space pointer, space accessible to us */
+#define ASSERT_SPARK_POOL_INVARIANTS(p) \
+ ASSERT((p)->bottom >= (p)->top); \
+ ASSERT((p)->size > 0); \
+ ASSERT((p)->size > (p)->bottom - (p)->top); \
+ ASSERT((p)->topBound <= (p)->top); \
+ ASSERT((p)->elements != NULL); \
+ ASSERT(*((p)->elements) || 1); \
+ ASSERT(*((p)->elements - 1 + ((p)->size)) || 1);
+
+// missing in old interface. Currently called by initSparkPools
+// internally.
+SparkPool* initPool(StgWord size);
+
+// special case: accessing our own pool, at the write end
+// otherwise, we can always steal from our pool as the others do...
+StgClosure* reclaimSpark(Capability *cap);
+
+rtsBool looksEmpty(SparkPool* deque);
+
+// rest: same as old interface
StgClosure * findSpark (Capability *cap);
void initSparkPools (void);
-void freeSparkPool (StgSparkPool *pool);
+void freeSparkPool (SparkPool *pool);
void createSparkThread (Capability *cap, StgClosure *p);
void pruneSparkQueues (void);
void traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
-INLINE_HEADER void discardSparks (StgSparkPool *pool);
-INLINE_HEADER nat sparkPoolSize (StgSparkPool *pool);
-INLINE_HEADER rtsBool emptySparkPool (StgSparkPool *pool);
+INLINE_HEADER void discardSparks (SparkPool *pool);
+INLINE_HEADER nat sparkPoolSize (SparkPool *pool);
INLINE_HEADER void discardSparksCap (Capability *cap);
INLINE_HEADER nat sparkPoolSizeCap (Capability *cap);
@@ -32,46 +60,33 @@ INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap);
#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
-INLINE_HEADER rtsBool
-emptySparkPool (StgSparkPool *pool)
-{
- return (pool->hd == pool->tl);
-}
+INLINE_HEADER rtsBool
+emptySparkPool (SparkPool *pool)
+{ return looksEmpty(pool); }
INLINE_HEADER rtsBool
emptySparkPoolCap (Capability *cap)
-{ return emptySparkPool(&cap->r.rSparks); }
+{ return looksEmpty(cap->sparks); }
INLINE_HEADER nat
-sparkPoolSize (StgSparkPool *pool)
+sparkPoolSize (SparkPool *pool)
{
- if (pool->hd <= pool->tl) {
- return (pool->tl - pool->hd);
- } else {
- return (pool->lim - pool->hd + pool->tl - pool->base);
- }
+ return (pool->bottom - pool->top);
}
INLINE_HEADER nat
sparkPoolSizeCap (Capability *cap)
-{ return sparkPoolSize(&cap->r.rSparks); }
+{ return sparkPoolSize(cap->sparks); }
INLINE_HEADER void
-discardSparks (StgSparkPool *pool)
+discardSparks (SparkPool *pool)
{
- pool->hd = pool->tl;
+ pool->top = pool->bottom = 0;
}
INLINE_HEADER void
discardSparksCap (Capability *cap)
-{ return discardSparks(&cap->r.rSparks); }
-
-
-#elif defined(THREADED_RTS)
-
-INLINE_HEADER rtsBool
-emptySparkPoolCap (Capability *cap STG_UNUSED)
-{ return rtsTrue; }
+{ return discardSparks(cap->sparks); }
#endif