diff options
author | Simon Marlow <marlowsd@gmail.com> | 2008-10-22 09:27:44 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2008-10-22 09:27:44 +0000 |
commit | 99df892cc9620fcc92747b79bba75dad8a1d295c (patch) | |
tree | 536df57e1d9975f88ce781627bb2dacaee5b2c0c /rts/Schedule.c | |
parent | cf9650f2a1690c04051c716124bb0350adc74ae7 (diff) | |
download | haskell-99df892cc9620fcc92747b79bba75dad8a1d295c.tar.gz |
Refactoring and reorganisation of the scheduler
Change the way we look for work in the scheduler. Previously,
checking to see whether there was anything to do was a
non-side-effecting operation, but this has changed now that we do
work-stealing. This lead to a refactoring of the inner loop of the
scheduler.
Also, lots of cleanup in the new work-stealing code, but no functional
changes.
One new statistic is added to the +RTS -s output:
SPARKS: 1430 (2 converted, 1427 pruned)
lets you know something about the use of `par` in the program.
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r-- | rts/Schedule.c | 231 |
1 files changed, 139 insertions, 92 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index 09150fd8b5..e17c6534d6 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -137,17 +137,21 @@ static Capability *schedule (Capability *initialCapability, Task *task); // scheduler clearer. // static void schedulePreLoop (void); +static void scheduleFindWork (Capability *cap); +#if defined(THREADED_RTS) +static void scheduleYield (Capability **pcap, Task *task); +#endif static void scheduleStartSignalHandlers (Capability *cap); static void scheduleCheckBlockedThreads (Capability *cap); static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS); static void scheduleCheckBlackHoles (Capability *cap); static void scheduleDetectDeadlock (Capability *cap, Task *task); -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) static void schedulePushWork(Capability *cap, Task *task); -static rtsBool scheduleGetRemoteWork(Capability *cap); #if defined(PARALLEL_HASKELL) +static rtsBool scheduleGetRemoteWork(Capability *cap); static void scheduleSendPendingMessages(void); #endif +#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) static void scheduleActivateSpark(Capability *cap); #endif static void schedulePostRunThread(Capability *cap, StgTSO *t); @@ -281,25 +285,6 @@ schedule (Capability *initialCapability, Task *task) while (TERMINATION_CONDITION) { -#if defined(THREADED_RTS) - if (first) { - // don't yield the first time, we want a chance to run this - // thread for a bit, even if there are others banging at the - // door. - first = rtsFalse; - ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); - } else { - // Yield the capability to higher-priority tasks if necessary. - yieldCapability(&cap, task); - /* inside yieldCapability, attempts to steal work from other - capabilities, unless the capability has own work. - See (REMARK) below. - */ - } -#endif - - /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */ - // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign // call). @@ -367,62 +352,11 @@ schedule (Capability *initialCapability, Task *task) barf("sched_state: %d", sched_state); } - /* this was the place to activate a spark, now below... */ - - scheduleStartSignalHandlers(cap); + scheduleFindWork(cap); - // Only check the black holes here if we've nothing else to do. - // During normal execution, the black hole list only gets checked - // at GC time, to avoid repeatedly traversing this possibly long - // list each time around the scheduler. - if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } - - scheduleCheckWakeupThreads(cap); - - scheduleCheckBlockedThreads(cap); - -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) - /* work distribution in multithreaded and parallel systems - - REMARK: IMHO best location for work-stealing as well. - tests above might yield some new jobs, so no need to steal a - spark in some cases. I believe the yieldCapability.. above - should be moved here. - */ - -#if defined(PARALLEL_HASKELL) - /* if messages have been buffered... a NOOP in THREADED_RTS */ - scheduleSendPendingMessages(); -#endif - - /* If the run queue is empty,...*/ - if (emptyRunQueue(cap)) { - /* ...take one of our own sparks and turn it into a thread */ - scheduleActivateSpark(cap); - - /* if this did not work, try to steal a spark from someone else */ - if (emptyRunQueue(cap)) { -#if defined(PARALLEL_HASKELL) - receivedFinish = scheduleGetRemoteWork(cap); - continue; // a new round, (hopefully) with new work - /* - in GUM, this a) sends out a FISH and returns IF no fish is - out already - b) (blocking) awaits and receives messages - - in Eden, this is only the blocking receive, as b) in GUM. - - in Threaded-RTS, this does plain nothing. Stealing routine - is inside Capability.c and called from - yieldCapability() at the very beginning, see REMARK. - */ -#endif - } - } else { /* i.e. run queue was (initially) not empty */ - schedulePushWork(cap,task); - /* work pushing, currently relevant only for THREADED_RTS: - (pushes threads, wakes up idle capabilities for stealing) */ - } + /* work pushing, currently relevant only for THREADED_RTS: + (pushes threads, wakes up idle capabilities for stealing) */ + schedulePushWork(cap,task); #if defined(PARALLEL_HASKELL) /* since we perform a blocking receive and continue otherwise, @@ -439,9 +373,8 @@ schedule (Capability *initialCapability, Task *task) } #endif // PARALLEL_HASKELL: non-empty run queue! -#endif /* THREADED_RTS || PARALLEL_HASKELL */ - scheduleDetectDeadlock(cap,task); + #if defined(THREADED_RTS) cap = task->cap; // reload cap, it might have changed #endif @@ -454,12 +387,27 @@ schedule (Capability *initialCapability, Task *task) // // win32: might be here due to awaitEvent() being abandoned // as a result of a console event having been delivered. - if ( emptyRunQueue(cap) ) { + +#if defined(THREADED_RTS) + if (first) + { + // XXX: ToDo + // // don't yield the first time, we want a chance to run this + // // thread for a bit, even if there are others banging at the + // // door. + // first = rtsFalse; + // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); + } + + scheduleYield(&cap,task); + if (emptyRunQueue(cap)) continue; // look for work again +#endif + #if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS) + if ( emptyRunQueue(cap) ) { ASSERT(sched_state >= SCHED_INTERRUPTING); -#endif - continue; // nothing to do } +#endif // // Get a thread to run @@ -683,12 +631,110 @@ schedulePreLoop(void) } /* ----------------------------------------------------------------------------- + * scheduleFindWork() + * + * Search for work to do, and handle messages from elsewhere. + * -------------------------------------------------------------------------- */ + +static void +scheduleFindWork (Capability *cap) +{ + scheduleStartSignalHandlers(cap); + + // Only check the black holes here if we've nothing else to do. + // During normal execution, the black hole list only gets checked + // at GC time, to avoid repeatedly traversing this possibly long + // list each time around the scheduler. + if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); } + + scheduleCheckWakeupThreads(cap); + + scheduleCheckBlockedThreads(cap); + +#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) + // Try to activate one of our own sparks + if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); } +#endif + +#if defined(THREADED_RTS) + // Try to steak work if we don't have any + if (emptyRunQueue(cap)) { stealWork(cap); } +#endif + +#if defined(PARALLEL_HASKELL) + // if messages have been buffered... + scheduleSendPendingMessages(); +#endif + +#if defined(PARALLEL_HASKELL) + if (emptyRunQueue(cap)) { + receivedFinish = scheduleGetRemoteWork(cap); + continue; // a new round, (hopefully) with new work + /* + in GUM, this a) sends out a FISH and returns IF no fish is + out already + b) (blocking) awaits and receives messages + + in Eden, this is only the blocking receive, as b) in GUM. + */ + } +#endif +} + +#if defined(THREADED_RTS) +STATIC_INLINE rtsBool +shouldYieldCapability (Capability *cap, Task *task) +{ + // we need to yield this capability to someone else if.. + // - another thread is initiating a GC + // - another Task is returning from a foreign call + // - the thread at the head of the run queue cannot be run + // by this Task (it is bound to another Task, or it is unbound + // and this task it bound). + return (waiting_for_gc || + cap->returning_tasks_hd != NULL || + (!emptyRunQueue(cap) && (task->tso == NULL + ? cap->run_queue_hd->bound != NULL + : cap->run_queue_hd->bound != task))); +} + +// This is the single place where a Task goes to sleep. There are +// two reasons it might need to sleep: +// - there are no threads to run +// - we need to yield this Capability to someone else +// (see shouldYieldCapability()) +// +// The return value indicates whether + +static void +scheduleYield (Capability **pcap, Task *task) +{ + Capability *cap = *pcap; + + // if we have work, and we don't need to give up the Capability, continue. + if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task)) + return; + + // otherwise yield (sleep), and keep yielding if necessary. + do { + yieldCapability(&cap,task); + } + while (shouldYieldCapability(cap,task)); + + // note there may still be no threads on the run queue at this + // point, the caller has to check. + + *pcap = cap; + return; +} +#endif + +/* ----------------------------------------------------------------------------- * schedulePushWork() * * Push work to other Capabilities if we have some. * -------------------------------------------------------------------------- */ -#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL) static void schedulePushWork(Capability *cap USED_IF_THREADS, Task *task USED_IF_THREADS) @@ -788,7 +834,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // i is the next free capability to push to for (; i < n_free_caps; i++) { if (emptySparkPoolCap(free_caps[i])) { - spark = findSpark(cap); + spark = tryStealSpark(cap->sparks); if (spark != NULL) { debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no); newSpark(&(free_caps[i]->r), spark); @@ -801,18 +847,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS, // release the capabilities for (i = 0; i < n_free_caps; i++) { task->cap = free_caps[i]; - releaseCapability(free_caps[i]); + releaseAndWakeupCapability(free_caps[i]); } - // now wake them all up, and they might steal sparks if - // the did not get a thread - prodAllCapabilities(); } task->cap = cap; // reset to point to our Capability. #endif /* THREADED_RTS */ } -#endif /* THREADED_RTS || PARALLEL_HASKELL */ /* ---------------------------------------------------------------------------- * Start any pending signal handlers @@ -1031,7 +1073,12 @@ scheduleActivateSpark(Capability *cap) on our run queue in the meantime ? But would need a lock.. */ return; - spark = findSpark(cap); // defined in Sparks.c + + // Really we should be using reclaimSpark() here, but + // experimentally it doesn't seem to perform as well as just + // stealing from our own spark pool: + // spark = reclaimSpark(cap->sparks); + spark = tryStealSpark(cap->sparks); // defined in Sparks.c if (spark != NULL) { debugTrace(DEBUG_sched, @@ -1046,9 +1093,9 @@ scheduleActivateSpark(Capability *cap) * Get work from a remote node (PARALLEL_HASKELL only) * ------------------------------------------------------------------------- */ -#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS) +#if defined(PARALLEL_HASKELL) static rtsBool /* return value used in PARALLEL_HASKELL only */ -scheduleGetRemoteWork(Capability *cap) +scheduleGetRemoteWork (Capability *cap STG_UNUSED) { #if defined(PARALLEL_HASKELL) rtsBool receivedFinish = rtsFalse; @@ -1800,7 +1847,7 @@ suspendThread (StgRegTable *reg) suspendTask(cap,task); cap->in_haskell = rtsFalse; - releaseCapability_(cap); + releaseCapability_(cap,rtsFalse); RELEASE_LOCK(&cap->lock); |