summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2008-10-22 09:27:44 +0000
committerSimon Marlow <marlowsd@gmail.com>2008-10-22 09:27:44 +0000
commit99df892cc9620fcc92747b79bba75dad8a1d295c (patch)
tree536df57e1d9975f88ce781627bb2dacaee5b2c0c /rts
parentcf9650f2a1690c04051c716124bb0350adc74ae7 (diff)
downloadhaskell-99df892cc9620fcc92747b79bba75dad8a1d295c.tar.gz
Refactoring and reorganisation of the scheduler
Change the way we look for work in the scheduler. Previously, checking to see whether there was anything to do was a non-side-effecting operation, but this has changed now that we do work-stealing. This lead to a refactoring of the inner loop of the scheduler. Also, lots of cleanup in the new work-stealing code, but no functional changes. One new statistic is added to the +RTS -s output: SPARKS: 1430 (2 converted, 1427 pruned) lets you know something about the use of `par` in the program.
Diffstat (limited to 'rts')
-rw-r--r--rts/Capability.c125
-rw-r--r--rts/Capability.h43
-rw-r--r--rts/Schedule.c231
-rw-r--r--rts/Sparks.c125
-rw-r--r--rts/Sparks.h77
-rw-r--r--rts/Stable.c3
-rw-r--r--rts/Stats.c15
7 files changed, 348 insertions, 271 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 516aaa573d..948922a3b2 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -54,15 +54,17 @@ globalWorkToDo (void)
#endif
#if defined(THREADED_RTS)
-rtsBool stealWork( Capability *cap) {
+rtsBool
+stealWork (Capability *cap)
+{
/* use the normal Sparks.h interface (internally modified to enable
concurrent stealing)
and immediately turn the spark into a thread when successful
*/
Capability *robbed;
- SparkPool *pool;
StgClosurePtr spark;
rtsBool success = rtsFalse;
+ rtsBool retry;
nat i = 0;
debugTrace(DEBUG_sched,
@@ -71,63 +73,40 @@ rtsBool stealWork( Capability *cap) {
if (n_capabilities == 1) { return rtsFalse; } // makes no sense...
- /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
- start at a random place instead of 0 as well. */
- for ( i=0 ; i < n_capabilities ; i++ ) {
- robbed = &capabilities[i];
- if (cap == robbed) // ourselves...
- continue;
+ do {
+ retry = rtsFalse;
- if (emptySparkPoolCap(robbed)) // nothing to steal here
- continue;
-
- spark = findSpark(robbed);
+ /* visit cap.s 0..n-1 in sequence until a theft succeeds. We could
+ start at a random place instead of 0 as well. */
+ for ( i=0 ; i < n_capabilities ; i++ ) {
+ robbed = &capabilities[i];
+ if (cap == robbed) // ourselves...
+ continue;
- if (spark == NULL && !emptySparkPoolCap(robbed)) {
- spark = findSpark(robbed); // lost race in concurrent access, try again
- }
- if (spark != NULL) {
- debugTrace(DEBUG_sched,
+ if (emptySparkPoolCap(robbed)) // nothing to steal here
+ continue;
+
+ spark = tryStealSpark(robbed->sparks);
+ if (spark == NULL && !emptySparkPoolCap(robbed)) {
+ // we conflicted with another thread while trying to steal;
+ // try again later.
+ retry = rtsTrue;
+ }
+
+ if (spark != NULL) {
+ debugTrace(DEBUG_sched,
"cap %d: Stole a spark from capability %d",
- cap->no, robbed->no);
+ cap->no, robbed->no);
- createSparkThread(cap,spark);
- success = rtsTrue;
- break; // got one, leave the loop
- }
- // otherwise: no success, try next one
- }
- debugTrace(DEBUG_sched,
- "Leaving work stealing routine (%s)",
- success?"one spark stolen":"thefts did not succeed");
- return success;
-}
+ createSparkThread(cap,spark);
+ return rtsTrue;
+ }
+ // otherwise: no success, try next one
+ }
+ } while (retry);
-STATIC_INLINE rtsBool
-anyWorkForMe( Capability *cap, Task *task )
-{
- if (task->tso != NULL) {
- // A bound task only runs if its thread is on the run queue of
- // the capability on which it was woken up. Otherwise, we
- // can't be sure that we have the right capability: the thread
- // might be woken up on some other capability, and task->cap
- // could change under our feet.
- return !emptyRunQueue(cap) && cap->run_queue_hd->bound == task;
- } else {
- // A vanilla worker task runs if either there is a lightweight
- // thread at the head of the run queue, or the run queue is
- // empty and (there are sparks to execute, or there is some
- // other global condition to check, such as threads blocked on
- // blackholes).
- if (emptyRunQueue(cap)) {
- return !emptySparkPoolCap(cap)
- || !emptyWakeupQueue(cap)
- || globalWorkToDo()
- || stealWork(cap); /* if all false: try to steal work */
- } else {
- return cap->run_queue_hd->bound == NULL;
- }
- }
+ debugTrace(DEBUG_sched, "No sparks stolen");
+ return rtsFalse;
}
#endif
@@ -194,6 +173,9 @@ initCapability( Capability *cap, nat i )
cap->returning_tasks_tl = NULL;
cap->wakeup_queue_hd = END_TSO_QUEUE;
cap->wakeup_queue_tl = END_TSO_QUEUE;
+ cap->sparks_created = 0;
+ cap->sparks_converted = 0;
+ cap->sparks_pruned = 0;
#endif
cap->f.stgGCEnter1 = (F_)__stg_gc_enter_1;
@@ -326,7 +308,8 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
#if defined(THREADED_RTS)
void
-releaseCapability_ (Capability* cap)
+releaseCapability_ (Capability* cap,
+ rtsBool always_wakeup)
{
Task *task;
@@ -384,8 +367,9 @@ releaseCapability_ (Capability* cap)
// If we have an unbound thread on the run queue, or if there's
// anything else to do, give the Capability to a worker thread.
- if (!emptyRunQueue(cap) || !emptyWakeupQueue(cap)
- || !emptySparkPoolCap(cap) || globalWorkToDo()) {
+ if (always_wakeup ||
+ !emptyRunQueue(cap) || !emptyWakeupQueue(cap) ||
+ !emptySparkPoolCap(cap) || globalWorkToDo()) {
if (cap->spare_workers) {
giveCapabilityToTask(cap,cap->spare_workers);
// The worker Task pops itself from the queue;
@@ -401,7 +385,15 @@ void
releaseCapability (Capability* cap USED_IF_THREADS)
{
ACQUIRE_LOCK(&cap->lock);
- releaseCapability_(cap);
+ releaseCapability_(cap, rtsFalse);
+ RELEASE_LOCK(&cap->lock);
+}
+
+void
+releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
+{
+ ACQUIRE_LOCK(&cap->lock);
+ releaseCapability_(cap, rtsTrue);
RELEASE_LOCK(&cap->lock);
}
@@ -427,7 +419,7 @@ releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
}
// Bound tasks just float around attached to their TSOs.
- releaseCapability_(cap);
+ releaseCapability_(cap,rtsFalse);
RELEASE_LOCK(&cap->lock);
}
@@ -534,16 +526,6 @@ yieldCapability (Capability** pCap, Task *task)
{
Capability *cap = *pCap;
- // The fast path has no locking, if we don't enter this while loop
-
- while ( waiting_for_gc
- /* i.e. another capability triggered HeapOverflow, is busy
- getting capabilities (stopping their owning tasks) */
- || cap->returning_tasks_hd != NULL
- /* cap reserved for another task */
- || !anyWorkForMe(cap,task)
- /* cap/task have no work */
- ) {
debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
// We must now release the capability and wait to be woken up
@@ -588,7 +570,6 @@ yieldCapability (Capability** pCap, Task *task)
trace(TRACE_sched | DEBUG_sched, "resuming capability %d", cap->no);
ASSERT(cap->running_task == task);
- }
*pCap = cap;
@@ -630,7 +611,7 @@ wakeupThreadOnCapability (Capability *my_cap,
appendToRunQueue(other_cap,tso);
trace(TRACE_sched, "resuming capability %d", other_cap->no);
- releaseCapability_(other_cap);
+ releaseCapability_(other_cap,rtsFalse);
} else {
appendToWakeupQueue(my_cap,other_cap,tso);
other_cap->context_switch = 1;
@@ -765,7 +746,7 @@ shutdownCapability (Capability *cap, Task *task, rtsBool safe)
if (!emptyRunQueue(cap) || cap->spare_workers) {
debugTrace(DEBUG_sched,
"runnable threads or workers still alive, yielding");
- releaseCapability_(cap); // this will wake up a worker
+ releaseCapability_(cap,rtsFalse); // this will wake up a worker
RELEASE_LOCK(&cap->lock);
yieldThread();
continue;
diff --git a/rts/Capability.h b/rts/Capability.h
index 59458951eb..779a1945a5 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -23,9 +23,9 @@
#ifndef CAPABILITY_H
#define CAPABILITY_H
-#include "RtsTypes.h"
#include "RtsFlags.h"
#include "Task.h"
+#include "Sparks.h"
struct Capability_ {
// State required by the STG virtual machine when running Haskell
@@ -91,6 +91,13 @@ struct Capability_ {
// woken up by another Capability.
StgTSO *wakeup_queue_hd;
StgTSO *wakeup_queue_tl;
+
+ SparkPool *sparks;
+
+ // Stats on spark creation/conversion
+ nat sparks_created;
+ nat sparks_converted;
+ nat sparks_pruned;
#endif
// Per-capability STM-related data
@@ -100,8 +107,6 @@ struct Capability_ {
StgTRecHeader *free_trec_headers;
nat transaction_tokens;
- SparkPool *sparks;
-
}; // typedef Capability, defined in RtsAPI.h
@@ -147,12 +152,16 @@ void initCapabilities (void);
// ASSUMES: cap->running_task is the current Task.
//
#if defined(THREADED_RTS)
-void releaseCapability (Capability* cap);
-void releaseCapability_ (Capability* cap); // assumes cap->lock is held
+void releaseCapability (Capability* cap);
+void releaseAndWakeupCapability (Capability* cap);
+void releaseCapability_ (Capability* cap, rtsBool always_wakeup);
+// assumes cap->lock is held
#else
// releaseCapability() is empty in non-threaded RTS
INLINE_HEADER void releaseCapability (Capability* cap STG_UNUSED) {};
-INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED) {};
+INLINE_HEADER void releaseAndWakeupCapability (Capability* cap STG_UNUSED) {};
+INLINE_HEADER void releaseCapability_ (Capability* cap STG_UNUSED,
+ rtsBool always_wakeup STG_UNUSED) {};
#endif
#if !IN_STG_CODE
@@ -231,6 +240,14 @@ void shutdownCapability (Capability *cap, Task *task, rtsBool wait_foreign);
//
rtsBool tryGrabCapability (Capability *cap, Task *task);
+// Try to steal a spark from other Capabilities
+//
+rtsBool stealWork (Capability *cap);
+
+INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap);
+INLINE_HEADER nat sparkPoolSizeCap (Capability *cap);
+INLINE_HEADER void discardSparksCap (Capability *cap);
+
#else // !THREADED_RTS
// Grab a capability. (Only in the non-threaded RTS; in the threaded
@@ -273,4 +290,18 @@ recordMutableCap (StgClosure *p, Capability *cap, nat gen)
*bd->free++ = (StgWord)p;
}
+#if defined(THREADED_RTS)
+INLINE_HEADER rtsBool
+emptySparkPoolCap (Capability *cap)
+{ return looksEmpty(cap->sparks); }
+
+INLINE_HEADER nat
+sparkPoolSizeCap (Capability *cap)
+{ return sparkPoolSize(cap->sparks); }
+
+INLINE_HEADER void
+discardSparksCap (Capability *cap)
+{ return discardSparks(cap->sparks); }
+#endif
+
#endif /* CAPABILITY_H */
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 09150fd8b5..e17c6534d6 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -137,17 +137,21 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
+static void scheduleFindWork (Capability *cap);
+#if defined(THREADED_RTS)
+static void scheduleYield (Capability **pcap, Task *task);
+#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void schedulePushWork(Capability *cap, Task *task);
-static rtsBool scheduleGetRemoteWork(Capability *cap);
#if defined(PARALLEL_HASKELL)
+static rtsBool scheduleGetRemoteWork(Capability *cap);
static void scheduleSendPendingMessages(void);
#endif
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
@@ -281,25 +285,6 @@ schedule (Capability *initialCapability, Task *task)
while (TERMINATION_CONDITION) {
-#if defined(THREADED_RTS)
- if (first) {
- // don't yield the first time, we want a chance to run this
- // thread for a bit, even if there are others banging at the
- // door.
- first = rtsFalse;
- ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- } else {
- // Yield the capability to higher-priority tasks if necessary.
- yieldCapability(&cap, task);
- /* inside yieldCapability, attempts to steal work from other
- capabilities, unless the capability has own work.
- See (REMARK) below.
- */
- }
-#endif
-
- /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
-
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
@@ -367,62 +352,11 @@ schedule (Capability *initialCapability, Task *task)
barf("sched_state: %d", sched_state);
}
- /* this was the place to activate a spark, now below... */
-
- scheduleStartSignalHandlers(cap);
+ scheduleFindWork(cap);
- // Only check the black holes here if we've nothing else to do.
- // During normal execution, the black hole list only gets checked
- // at GC time, to avoid repeatedly traversing this possibly long
- // list each time around the scheduler.
- if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
- scheduleCheckWakeupThreads(cap);
-
- scheduleCheckBlockedThreads(cap);
-
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
- /* work distribution in multithreaded and parallel systems
-
- REMARK: IMHO best location for work-stealing as well.
- tests above might yield some new jobs, so no need to steal a
- spark in some cases. I believe the yieldCapability.. above
- should be moved here.
- */
-
-#if defined(PARALLEL_HASKELL)
- /* if messages have been buffered... a NOOP in THREADED_RTS */
- scheduleSendPendingMessages();
-#endif
-
- /* If the run queue is empty,...*/
- if (emptyRunQueue(cap)) {
- /* ...take one of our own sparks and turn it into a thread */
- scheduleActivateSpark(cap);
-
- /* if this did not work, try to steal a spark from someone else */
- if (emptyRunQueue(cap)) {
-#if defined(PARALLEL_HASKELL)
- receivedFinish = scheduleGetRemoteWork(cap);
- continue; // a new round, (hopefully) with new work
- /*
- in GUM, this a) sends out a FISH and returns IF no fish is
- out already
- b) (blocking) awaits and receives messages
-
- in Eden, this is only the blocking receive, as b) in GUM.
-
- in Threaded-RTS, this does plain nothing. Stealing routine
- is inside Capability.c and called from
- yieldCapability() at the very beginning, see REMARK.
- */
-#endif
- }
- } else { /* i.e. run queue was (initially) not empty */
- schedulePushWork(cap,task);
- /* work pushing, currently relevant only for THREADED_RTS:
- (pushes threads, wakes up idle capabilities for stealing) */
- }
+ /* work pushing, currently relevant only for THREADED_RTS:
+ (pushes threads, wakes up idle capabilities for stealing) */
+ schedulePushWork(cap,task);
#if defined(PARALLEL_HASKELL)
/* since we perform a blocking receive and continue otherwise,
@@ -439,9 +373,8 @@ schedule (Capability *initialCapability, Task *task)
}
#endif // PARALLEL_HASKELL: non-empty run queue!
-#endif /* THREADED_RTS || PARALLEL_HASKELL */
-
scheduleDetectDeadlock(cap,task);
+
#if defined(THREADED_RTS)
cap = task->cap; // reload cap, it might have changed
#endif
@@ -454,12 +387,27 @@ schedule (Capability *initialCapability, Task *task)
//
// win32: might be here due to awaitEvent() being abandoned
// as a result of a console event having been delivered.
- if ( emptyRunQueue(cap) ) {
+
+#if defined(THREADED_RTS)
+ if (first)
+ {
+ // XXX: ToDo
+ // // don't yield the first time, we want a chance to run this
+ // // thread for a bit, even if there are others banging at the
+ // // door.
+ // first = rtsFalse;
+ // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+ }
+
+ scheduleYield(&cap,task);
+ if (emptyRunQueue(cap)) continue; // look for work again
+#endif
+
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
+ if ( emptyRunQueue(cap) ) {
ASSERT(sched_state >= SCHED_INTERRUPTING);
-#endif
- continue; // nothing to do
}
+#endif
//
// Get a thread to run
@@ -683,12 +631,110 @@ schedulePreLoop(void)
}
/* -----------------------------------------------------------------------------
+ * scheduleFindWork()
+ *
+ * Search for work to do, and handle messages from elsewhere.
+ * -------------------------------------------------------------------------- */
+
+static void
+scheduleFindWork (Capability *cap)
+{
+ scheduleStartSignalHandlers(cap);
+
+ // Only check the black holes here if we've nothing else to do.
+ // During normal execution, the black hole list only gets checked
+ // at GC time, to avoid repeatedly traversing this possibly long
+ // list each time around the scheduler.
+ if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
+
+ scheduleCheckWakeupThreads(cap);
+
+ scheduleCheckBlockedThreads(cap);
+
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+ // Try to activate one of our own sparks
+ if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
+#endif
+
+#if defined(THREADED_RTS)
+ // Try to steak work if we don't have any
+ if (emptyRunQueue(cap)) { stealWork(cap); }
+#endif
+
+#if defined(PARALLEL_HASKELL)
+ // if messages have been buffered...
+ scheduleSendPendingMessages();
+#endif
+
+#if defined(PARALLEL_HASKELL)
+ if (emptyRunQueue(cap)) {
+ receivedFinish = scheduleGetRemoteWork(cap);
+ continue; // a new round, (hopefully) with new work
+ /*
+ in GUM, this a) sends out a FISH and returns IF no fish is
+ out already
+ b) (blocking) awaits and receives messages
+
+ in Eden, this is only the blocking receive, as b) in GUM.
+ */
+ }
+#endif
+}
+
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+shouldYieldCapability (Capability *cap, Task *task)
+{
+ // we need to yield this capability to someone else if..
+ // - another thread is initiating a GC
+ // - another Task is returning from a foreign call
+ // - the thread at the head of the run queue cannot be run
+ // by this Task (it is bound to another Task, or it is unbound
+ // and this task it bound).
+ return (waiting_for_gc ||
+ cap->returning_tasks_hd != NULL ||
+ (!emptyRunQueue(cap) && (task->tso == NULL
+ ? cap->run_queue_hd->bound != NULL
+ : cap->run_queue_hd->bound != task)));
+}
+
+// This is the single place where a Task goes to sleep. There are
+// two reasons it might need to sleep:
+// - there are no threads to run
+// - we need to yield this Capability to someone else
+// (see shouldYieldCapability())
+//
+// The return value indicates whether
+
+static void
+scheduleYield (Capability **pcap, Task *task)
+{
+ Capability *cap = *pcap;
+
+ // if we have work, and we don't need to give up the Capability, continue.
+ if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
+ return;
+
+ // otherwise yield (sleep), and keep yielding if necessary.
+ do {
+ yieldCapability(&cap,task);
+ }
+ while (shouldYieldCapability(cap,task));
+
+ // note there may still be no threads on the run queue at this
+ // point, the caller has to check.
+
+ *pcap = cap;
+ return;
+}
+#endif
+
+/* -----------------------------------------------------------------------------
* schedulePushWork()
*
* Push work to other Capabilities if we have some.
* -------------------------------------------------------------------------- */
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
static void
schedulePushWork(Capability *cap USED_IF_THREADS,
Task *task USED_IF_THREADS)
@@ -788,7 +834,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// i is the next free capability to push to
for (; i < n_free_caps; i++) {
if (emptySparkPoolCap(free_caps[i])) {
- spark = findSpark(cap);
+ spark = tryStealSpark(cap->sparks);
if (spark != NULL) {
debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
newSpark(&(free_caps[i]->r), spark);
@@ -801,18 +847,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
task->cap = free_caps[i];
- releaseCapability(free_caps[i]);
+ releaseAndWakeupCapability(free_caps[i]);
}
- // now wake them all up, and they might steal sparks if
- // the did not get a thread
- prodAllCapabilities();
}
task->cap = cap; // reset to point to our Capability.
#endif /* THREADED_RTS */
}
-#endif /* THREADED_RTS || PARALLEL_HASKELL */
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
@@ -1031,7 +1073,12 @@ scheduleActivateSpark(Capability *cap)
on our run queue in the meantime ? But would need a lock.. */
return;
- spark = findSpark(cap); // defined in Sparks.c
+
+ // Really we should be using reclaimSpark() here, but
+ // experimentally it doesn't seem to perform as well as just
+ // stealing from our own spark pool:
+ // spark = reclaimSpark(cap->sparks);
+ spark = tryStealSpark(cap->sparks); // defined in Sparks.c
if (spark != NULL) {
debugTrace(DEBUG_sched,
@@ -1046,9 +1093,9 @@ scheduleActivateSpark(Capability *cap)
* Get work from a remote node (PARALLEL_HASKELL only)
* ------------------------------------------------------------------------- */
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(PARALLEL_HASKELL)
static rtsBool /* return value used in PARALLEL_HASKELL only */
-scheduleGetRemoteWork(Capability *cap)
+scheduleGetRemoteWork (Capability *cap STG_UNUSED)
{
#if defined(PARALLEL_HASKELL)
rtsBool receivedFinish = rtsFalse;
@@ -1800,7 +1847,7 @@ suspendThread (StgRegTable *reg)
suspendTask(cap,task);
cap->in_haskell = rtsFalse;
- releaseCapability_(cap);
+ releaseCapability_(cap,rtsFalse);
RELEASE_LOCK(&cap->lock);
diff --git a/rts/Sparks.c b/rts/Sparks.c
index ac11172a9d..360ea41a05 100644
--- a/rts/Sparks.c
+++ b/rts/Sparks.c
@@ -53,9 +53,9 @@
/* internal helpers ... */
-StgWord roundUp2(StgWord val);
-
-StgWord roundUp2(StgWord val) {
+static StgWord
+roundUp2(StgWord val)
+{
StgWord rounded = 1;
/* StgWord is unsigned anyway, only catch 0 */
@@ -69,25 +69,6 @@ StgWord roundUp2(StgWord val) {
return rounded;
}
-INLINE_HEADER
-rtsBool casTop(StgPtr addr, StgWord old, StgWord new);
-
-#if !defined(THREADED_RTS)
-/* missing def. in non THREADED RTS, and makes no sense anyway... */
-StgWord cas(StgPtr addr,StgWord old,StgWord new);
-StgWord cas(StgPtr addr,StgWord old,StgWord new) {
- barf("cas: not implemented without multithreading");
- old = new = *addr; /* to avoid gcc warnings */
-}
-#endif
-
-INLINE_HEADER
-rtsBool casTop(StgWord* addr, StgWord old, StgWord new) {
- StgWord res = cas((StgPtr) addr, old, new);
- return ((res == old));
-}
-
-/* or simply like this */
#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
/* -----------------------------------------------------------------------------
@@ -97,8 +78,9 @@ rtsBool casTop(StgWord* addr, StgWord old, StgWord new) {
* -------------------------------------------------------------------------- */
/* constructor */
-SparkPool* initPool(StgWord size) {
-
+static SparkPool*
+initPool(StgWord size)
+{
StgWord realsize;
SparkPool *q;
@@ -136,14 +118,17 @@ initSparkPools( void )
}
void
-freeSparkPool(SparkPool *pool) {
+freeSparkPool (SparkPool *pool)
+{
/* should not interfere with concurrent findSpark() calls! And
nobody should use the pointer any more. We cross our fingers...*/
stgFree(pool->elements);
stgFree(pool);
}
-/* reclaimSpark(cap): remove a spark from the write end of the queue.
+/* -----------------------------------------------------------------------------
+ *
+ * reclaimSpark: remove a spark from the write end of the queue.
* Returns the removed spark, and NULL if a race is lost or the pool
* empty.
*
@@ -151,9 +136,12 @@ freeSparkPool(SparkPool *pool) {
* concurrently stealing threads by using cas to modify the top field.
* This routine should NEVER be called by a task which does not own
* the capability. Can this be checked here?
- */
-StgClosure* reclaimSpark(Capability *cap) {
- SparkPool *deque = cap->sparks;
+ *
+ * -------------------------------------------------------------------------- */
+
+StgClosure *
+reclaimSpark (SparkPool *deque)
+{
/* also a bit tricky, has to avoid concurrent steal() calls by
accessing top with cas, when there is only one element left */
StgWord t, b;
@@ -196,19 +184,17 @@ StgClosure* reclaimSpark(Capability *cap) {
/* -----------------------------------------------------------------------------
*
- * findSpark: find a spark on the current Capability that we can fork
- * into a thread.
+ * tryStealSpark: try to steal a spark from a Capability.
*
- * May be called by concurrent threads, which synchronise on top
- * variable. Returns a spark, or NULL if pool empty or race lost.
+ * Returns a valid spark, or NULL if the pool was empty, and can
+ * occasionally return NULL if there was a race with another thread
+ * stealing from the same pool. In this case, try again later.
*
-------------------------------------------------------------------------- */
-StgClosurePtr steal(SparkPool *deque);
-
-/* steal an element from the read end. Synchronises multiple callers
- by failing with NULL return. Returns NULL when deque is empty. */
-StgClosurePtr steal(SparkPool *deque) {
+static StgClosurePtr
+steal(SparkPool *deque)
+{
StgClosurePtr* pos;
StgClosurePtr* arraybase;
StgWord sz;
@@ -231,43 +217,39 @@ StgClosurePtr steal(SparkPool *deque) {
/* now decide whether we have won */
if ( !(CASTOP(&(deque->top),t,t+1)) ) {
- /* lost the race, someon else has changed top in the meantime */
- stolen = NULL;
+ /* lost the race, someon else has changed top in the meantime */
+ return NULL;
} /* else: OK, top has been incremented by the cas call */
-
ASSERT_SPARK_POOL_INVARIANTS(deque);
- /* return NULL or stolen element */
+ /* return stolen element */
return stolen;
}
StgClosure *
-findSpark (Capability *cap)
+tryStealSpark (SparkPool *pool)
{
- SparkPool *deque = (cap->sparks);
StgClosure *stolen;
- ASSERT_SPARK_POOL_INVARIANTS(deque);
-
do {
- /* keep trying until good spark found or pool looks empty.
- TODO is this a good idea? */
-
- stolen = steal(deque);
-
- } while ( ( !stolen /* nothing stolen*/
- || !closure_SHOULD_SPARK(stolen)) /* spark not OK */
- && !looksEmpty(deque)); /* run empty, give up */
+ stolen = steal(pool);
+ } while (stolen != NULL && !closure_SHOULD_SPARK(stolen));
- /* return stolen element */
return stolen;
}
-/* "guesses" whether a deque is empty. Can return false negatives in
- presence of concurrent steal() calls, and false positives in
- presence of a concurrent pushBottom().*/
-rtsBool looksEmpty(SparkPool* deque) {
+/* -----------------------------------------------------------------------------
+ *
+ * "guesses" whether a deque is empty. Can return false negatives in
+ * presence of concurrent steal() calls, and false positives in
+ * presence of a concurrent pushBottom().
+ *
+ * -------------------------------------------------------------------------- */
+
+rtsBool
+looksEmpty(SparkPool* deque)
+{
StgWord t = deque->top;
StgWord b = deque->bottom;
/* try to prefer false negatives by reading top first */
@@ -288,6 +270,7 @@ createSparkThread (Capability *cap, StgClosure *p)
tso = createGenThread (cap, RtsFlags.GcFlags.initialStkSize, p);
appendToRunQueue(cap,tso);
+ cap->sparks_converted++;
}
/* -----------------------------------------------------------------------------
@@ -297,11 +280,12 @@ createSparkThread (Capability *cap, StgClosure *p)
* -------------------------------------------------------------------------- */
#define DISCARD_NEW
-void pushBottom(SparkPool* deque, StgClosurePtr elem);
/* enqueue an element. Should always succeed by resizing the array
(not implemented yet, silently fails in that case). */
-void pushBottom(SparkPool* deque, StgClosurePtr elem) {
+static void
+pushBottom (SparkPool* deque, StgClosurePtr elem)
+{
StgWord t;
StgClosurePtr* pos;
StgWord sz = deque->moduloSize;
@@ -349,12 +333,16 @@ void pushBottom(SparkPool* deque, StgClosurePtr elem) {
}
-/* this is called as a direct C-call from Stg => we need to keep the
- pool in a register (???) */
+/* --------------------------------------------------------------------------
+ * newSpark: create a new spark, as a result of calling "par"
+ * Called directly from STG.
+ * -------------------------------------------------------------------------- */
+
StgInt
newSpark (StgRegTable *reg, StgClosure *p)
{
- SparkPool *pool = (reg->rCurrentTSO->cap->sparks);
+ Capability *cap = regTableToCapability(reg);
+ SparkPool *pool = cap->sparks;
/* I am not sure whether this is the right thing to do.
* Maybe it is better to exploit the tag information
@@ -368,6 +356,8 @@ newSpark (StgRegTable *reg, StgClosure *p)
pushBottom(pool,p);
}
+ cap->sparks_created++;
+
ASSERT_SPARK_POOL_INVARIANTS(pool);
return 1;
}
@@ -385,7 +375,7 @@ static void
pruneSparkQueue (Capability *cap)
{
SparkPool *pool;
- StgClosurePtr spark, evacspark, *elements;
+ StgClosurePtr spark, *elements;
nat n, pruned_sparks; // stats only
StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
@@ -457,6 +447,7 @@ pruneSparkQueue (Capability *cap)
n++;
} else {
pruned_sparks++; // discard spark
+ cap->sparks_pruned++;
}
currInd++;
@@ -528,7 +519,6 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
}
/* ----------------------------------------------------------------------------
-
* balanceSparkPoolsCaps: takes an array of capabilities (usually: all
* capabilities) and its size. Accesses all spark pools and equally
* distributes the sparks among them.
@@ -537,7 +527,8 @@ traverseSparkQueue (evac_fn evac, void *user, Capability *cap)
* -------------------------------------------------------------------------- */
void balanceSparkPoolsCaps(nat n_caps, Capability caps[]);
-void balanceSparkPoolsCaps(nat n_caps, Capability caps[]) {
+void balanceSparkPoolsCaps(nat n_caps STG_UNUSED,
+ Capability caps[] STG_UNUSED) {
barf("not implemented");
}
diff --git a/rts/Sparks.h b/rts/Sparks.h
index dbbf268988..4062a0b981 100644
--- a/rts/Sparks.h
+++ b/rts/Sparks.h
@@ -17,6 +17,40 @@
#if defined(THREADED_RTS)
+/* Spark pools: used to store pending sparks
+ * (THREADED_RTS & PARALLEL_HASKELL only)
+ * Implementation uses a DeQue to enable concurrent read accesses at
+ * the top end.
+ */
+typedef struct SparkPool_ {
+ /* Size of elements array. Used for modulo calculation: we round up
+ to powers of 2 and use the dyadic log (modulo == bitwise &) */
+ StgWord size;
+ StgWord moduloSize; /* bitmask for modulo */
+
+ /* top, index where multiple readers steal() (protected by a cas) */
+ volatile StgWord top;
+
+ /* bottom, index of next free place where one writer can push
+ elements. This happens unsynchronised. */
+ volatile StgWord bottom;
+ /* both position indices are continuously incremented, and used as
+ an index modulo the current array size. */
+
+ /* lower bound on the current top value. This is an internal
+ optimisation to avoid unnecessarily accessing the top field
+ inside pushBottom */
+ volatile StgWord topBound;
+
+ /* The elements array */
+ StgClosurePtr* elements;
+ /* Please note: the dataspace cannot follow the admin fields
+ immediately, as it should be possible to enlarge it without
+ disposing the old one automatically (as realloc would)! */
+
+} SparkPool;
+
+
/* INVARIANTS, in this order: bottom/top consistent, reasonable size,
topBound consistent, space pointer, space accessible to us */
#define ASSERT_SPARK_POOL_INVARIANTS(p) \
@@ -28,30 +62,25 @@
ASSERT(*((p)->elements) || 1); \
ASSERT(*((p)->elements - 1 + ((p)->size)) || 1);
-// missing in old interface. Currently called by initSparkPools
-// internally.
-SparkPool* initPool(StgWord size);
+// Initialisation
+void initSparkPools (void);
-// special case: accessing our own pool, at the write end
-// otherwise, we can always steal from our pool as the others do...
-StgClosure* reclaimSpark(Capability *cap);
+// Take a spark from the "write" end of the pool. Can be called
+// by the pool owner only.
+StgClosure* reclaimSpark(SparkPool *pool);
+// Returns True if the spark pool is empty (can give a false positive
+// if the pool is almost empty).
rtsBool looksEmpty(SparkPool* deque);
-// rest: same as old interface
-StgClosure * findSpark (Capability *cap);
-void initSparkPools (void);
+StgClosure * tryStealSpark (SparkPool *pool);
void freeSparkPool (SparkPool *pool);
void createSparkThread (Capability *cap, StgClosure *p);
void pruneSparkQueues (void);
void traverseSparkQueue(evac_fn evac, void *user, Capability *cap);
-INLINE_HEADER void discardSparks (SparkPool *pool);
-INLINE_HEADER nat sparkPoolSize (SparkPool *pool);
-
-INLINE_HEADER void discardSparksCap (Capability *cap);
-INLINE_HEADER nat sparkPoolSizeCap (Capability *cap);
-INLINE_HEADER rtsBool emptySparkPoolCap (Capability *cap);
+INLINE_HEADER void discardSparks (SparkPool *pool);
+INLINE_HEADER nat sparkPoolSize (SparkPool *pool);
#endif
/* -----------------------------------------------------------------------------
@@ -64,30 +93,16 @@ INLINE_HEADER rtsBool
emptySparkPool (SparkPool *pool)
{ return looksEmpty(pool); }
-INLINE_HEADER rtsBool
-emptySparkPoolCap (Capability *cap)
-{ return looksEmpty(cap->sparks); }
-
INLINE_HEADER nat
sparkPoolSize (SparkPool *pool)
-{
- return (pool->bottom - pool->top);
-}
-
-INLINE_HEADER nat
-sparkPoolSizeCap (Capability *cap)
-{ return sparkPoolSize(cap->sparks); }
+{ return (pool->bottom - pool->top); }
INLINE_HEADER void
discardSparks (SparkPool *pool)
{
- pool->top = pool->bottom = 0;
+ pool->top = pool->topBound = pool->bottom = 0;
}
-INLINE_HEADER void
-discardSparksCap (Capability *cap)
-{ return discardSparks(cap->sparks); }
-
#endif
#endif /* SPARKS_H */
diff --git a/rts/Stable.c b/rts/Stable.c
index a2c47d785c..94a756a380 100644
--- a/rts/Stable.c
+++ b/rts/Stable.c
@@ -6,9 +6,6 @@
*
* ---------------------------------------------------------------------------*/
-// Make static versions of inline functions in Stable.h:
-#define RTS_STABLE_C
-
#include "PosixSource.h"
#include "Rts.h"
#include "Hash.h"
diff --git a/rts/Stats.c b/rts/Stats.c
index 2e15613135..228f0c021e 100644
--- a/rts/Stats.c
+++ b/rts/Stats.c
@@ -641,6 +641,21 @@ stat_exit(int alloc)
TICK_TO_DBL(task->gc_etime));
}
}
+
+ {
+ nat i;
+ lnat sparks_created = 0;
+ lnat sparks_converted = 0;
+ lnat sparks_pruned = 0;
+ for (i = 0; i < n_capabilities; i++) {
+ sparks_created += capabilities[i].sparks_created;
+ sparks_converted += capabilities[i].sparks_converted;
+ sparks_pruned += capabilities[i].sparks_pruned;
+ }
+
+ statsPrintf(" SPARKS: %ld (%ld converted, %ld pruned)\n\n",
+ sparks_created, sparks_converted, sparks_pruned);
+ }
#endif
statsPrintf(" INIT time %6.2fs (%6.2fs elapsed)\n",