summaryrefslogtreecommitdiff
path: root/rts/Schedule.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2008-10-22 09:27:44 +0000
committerSimon Marlow <marlowsd@gmail.com>2008-10-22 09:27:44 +0000
commit99df892cc9620fcc92747b79bba75dad8a1d295c (patch)
tree536df57e1d9975f88ce781627bb2dacaee5b2c0c /rts/Schedule.c
parentcf9650f2a1690c04051c716124bb0350adc74ae7 (diff)
downloadhaskell-99df892cc9620fcc92747b79bba75dad8a1d295c.tar.gz
Refactoring and reorganisation of the scheduler
Change the way we look for work in the scheduler. Previously, checking to see whether there was anything to do was a non-side-effecting operation, but this has changed now that we do work-stealing. This lead to a refactoring of the inner loop of the scheduler. Also, lots of cleanup in the new work-stealing code, but no functional changes. One new statistic is added to the +RTS -s output: SPARKS: 1430 (2 converted, 1427 pruned) lets you know something about the use of `par` in the program.
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r--rts/Schedule.c231
1 files changed, 139 insertions, 92 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 09150fd8b5..e17c6534d6 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -137,17 +137,21 @@ static Capability *schedule (Capability *initialCapability, Task *task);
// scheduler clearer.
//
static void schedulePreLoop (void);
+static void scheduleFindWork (Capability *cap);
+#if defined(THREADED_RTS)
+static void scheduleYield (Capability **pcap, Task *task);
+#endif
static void scheduleStartSignalHandlers (Capability *cap);
static void scheduleCheckBlockedThreads (Capability *cap);
static void scheduleCheckWakeupThreads(Capability *cap USED_IF_NOT_THREADS);
static void scheduleCheckBlackHoles (Capability *cap);
static void scheduleDetectDeadlock (Capability *cap, Task *task);
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void schedulePushWork(Capability *cap, Task *task);
-static rtsBool scheduleGetRemoteWork(Capability *cap);
#if defined(PARALLEL_HASKELL)
+static rtsBool scheduleGetRemoteWork(Capability *cap);
static void scheduleSendPendingMessages(void);
#endif
+#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
static void scheduleActivateSpark(Capability *cap);
#endif
static void schedulePostRunThread(Capability *cap, StgTSO *t);
@@ -281,25 +285,6 @@ schedule (Capability *initialCapability, Task *task)
while (TERMINATION_CONDITION) {
-#if defined(THREADED_RTS)
- if (first) {
- // don't yield the first time, we want a chance to run this
- // thread for a bit, even if there are others banging at the
- // door.
- first = rtsFalse;
- ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
- } else {
- // Yield the capability to higher-priority tasks if necessary.
- yieldCapability(&cap, task);
- /* inside yieldCapability, attempts to steal work from other
- capabilities, unless the capability has own work.
- See (REMARK) below.
- */
- }
-#endif
-
- /* THIS WAS THE PLACE FOR THREADED_RTS::schedulePushWork(cap,task) */
-
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
// call).
@@ -367,62 +352,11 @@ schedule (Capability *initialCapability, Task *task)
barf("sched_state: %d", sched_state);
}
- /* this was the place to activate a spark, now below... */
-
- scheduleStartSignalHandlers(cap);
+ scheduleFindWork(cap);
- // Only check the black holes here if we've nothing else to do.
- // During normal execution, the black hole list only gets checked
- // at GC time, to avoid repeatedly traversing this possibly long
- // list each time around the scheduler.
- if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
-
- scheduleCheckWakeupThreads(cap);
-
- scheduleCheckBlockedThreads(cap);
-
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
- /* work distribution in multithreaded and parallel systems
-
- REMARK: IMHO best location for work-stealing as well.
- tests above might yield some new jobs, so no need to steal a
- spark in some cases. I believe the yieldCapability.. above
- should be moved here.
- */
-
-#if defined(PARALLEL_HASKELL)
- /* if messages have been buffered... a NOOP in THREADED_RTS */
- scheduleSendPendingMessages();
-#endif
-
- /* If the run queue is empty,...*/
- if (emptyRunQueue(cap)) {
- /* ...take one of our own sparks and turn it into a thread */
- scheduleActivateSpark(cap);
-
- /* if this did not work, try to steal a spark from someone else */
- if (emptyRunQueue(cap)) {
-#if defined(PARALLEL_HASKELL)
- receivedFinish = scheduleGetRemoteWork(cap);
- continue; // a new round, (hopefully) with new work
- /*
- in GUM, this a) sends out a FISH and returns IF no fish is
- out already
- b) (blocking) awaits and receives messages
-
- in Eden, this is only the blocking receive, as b) in GUM.
-
- in Threaded-RTS, this does plain nothing. Stealing routine
- is inside Capability.c and called from
- yieldCapability() at the very beginning, see REMARK.
- */
-#endif
- }
- } else { /* i.e. run queue was (initially) not empty */
- schedulePushWork(cap,task);
- /* work pushing, currently relevant only for THREADED_RTS:
- (pushes threads, wakes up idle capabilities for stealing) */
- }
+ /* work pushing, currently relevant only for THREADED_RTS:
+ (pushes threads, wakes up idle capabilities for stealing) */
+ schedulePushWork(cap,task);
#if defined(PARALLEL_HASKELL)
/* since we perform a blocking receive and continue otherwise,
@@ -439,9 +373,8 @@ schedule (Capability *initialCapability, Task *task)
}
#endif // PARALLEL_HASKELL: non-empty run queue!
-#endif /* THREADED_RTS || PARALLEL_HASKELL */
-
scheduleDetectDeadlock(cap,task);
+
#if defined(THREADED_RTS)
cap = task->cap; // reload cap, it might have changed
#endif
@@ -454,12 +387,27 @@ schedule (Capability *initialCapability, Task *task)
//
// win32: might be here due to awaitEvent() being abandoned
// as a result of a console event having been delivered.
- if ( emptyRunQueue(cap) ) {
+
+#if defined(THREADED_RTS)
+ if (first)
+ {
+ // XXX: ToDo
+ // // don't yield the first time, we want a chance to run this
+ // // thread for a bit, even if there are others banging at the
+ // // door.
+ // first = rtsFalse;
+ // ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
+ }
+
+ scheduleYield(&cap,task);
+ if (emptyRunQueue(cap)) continue; // look for work again
+#endif
+
#if !defined(THREADED_RTS) && !defined(mingw32_HOST_OS)
+ if ( emptyRunQueue(cap) ) {
ASSERT(sched_state >= SCHED_INTERRUPTING);
-#endif
- continue; // nothing to do
}
+#endif
//
// Get a thread to run
@@ -683,12 +631,110 @@ schedulePreLoop(void)
}
/* -----------------------------------------------------------------------------
+ * scheduleFindWork()
+ *
+ * Search for work to do, and handle messages from elsewhere.
+ * -------------------------------------------------------------------------- */
+
+static void
+scheduleFindWork (Capability *cap)
+{
+ scheduleStartSignalHandlers(cap);
+
+ // Only check the black holes here if we've nothing else to do.
+ // During normal execution, the black hole list only gets checked
+ // at GC time, to avoid repeatedly traversing this possibly long
+ // list each time around the scheduler.
+ if (emptyRunQueue(cap)) { scheduleCheckBlackHoles(cap); }
+
+ scheduleCheckWakeupThreads(cap);
+
+ scheduleCheckBlockedThreads(cap);
+
+#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
+ // Try to activate one of our own sparks
+ if (emptyRunQueue(cap)) { scheduleActivateSpark(cap); }
+#endif
+
+#if defined(THREADED_RTS)
+ // Try to steak work if we don't have any
+ if (emptyRunQueue(cap)) { stealWork(cap); }
+#endif
+
+#if defined(PARALLEL_HASKELL)
+ // if messages have been buffered...
+ scheduleSendPendingMessages();
+#endif
+
+#if defined(PARALLEL_HASKELL)
+ if (emptyRunQueue(cap)) {
+ receivedFinish = scheduleGetRemoteWork(cap);
+ continue; // a new round, (hopefully) with new work
+ /*
+ in GUM, this a) sends out a FISH and returns IF no fish is
+ out already
+ b) (blocking) awaits and receives messages
+
+ in Eden, this is only the blocking receive, as b) in GUM.
+ */
+ }
+#endif
+}
+
+#if defined(THREADED_RTS)
+STATIC_INLINE rtsBool
+shouldYieldCapability (Capability *cap, Task *task)
+{
+ // we need to yield this capability to someone else if..
+ // - another thread is initiating a GC
+ // - another Task is returning from a foreign call
+ // - the thread at the head of the run queue cannot be run
+ // by this Task (it is bound to another Task, or it is unbound
+ // and this task it bound).
+ return (waiting_for_gc ||
+ cap->returning_tasks_hd != NULL ||
+ (!emptyRunQueue(cap) && (task->tso == NULL
+ ? cap->run_queue_hd->bound != NULL
+ : cap->run_queue_hd->bound != task)));
+}
+
+// This is the single place where a Task goes to sleep. There are
+// two reasons it might need to sleep:
+// - there are no threads to run
+// - we need to yield this Capability to someone else
+// (see shouldYieldCapability())
+//
+// The return value indicates whether
+
+static void
+scheduleYield (Capability **pcap, Task *task)
+{
+ Capability *cap = *pcap;
+
+ // if we have work, and we don't need to give up the Capability, continue.
+ if (!emptyRunQueue(cap) && !shouldYieldCapability(cap,task))
+ return;
+
+ // otherwise yield (sleep), and keep yielding if necessary.
+ do {
+ yieldCapability(&cap,task);
+ }
+ while (shouldYieldCapability(cap,task));
+
+ // note there may still be no threads on the run queue at this
+ // point, the caller has to check.
+
+ *pcap = cap;
+ return;
+}
+#endif
+
+/* -----------------------------------------------------------------------------
* schedulePushWork()
*
* Push work to other Capabilities if we have some.
* -------------------------------------------------------------------------- */
-#if defined(THREADED_RTS) || defined(PARALLEL_HASKELL)
static void
schedulePushWork(Capability *cap USED_IF_THREADS,
Task *task USED_IF_THREADS)
@@ -788,7 +834,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// i is the next free capability to push to
for (; i < n_free_caps; i++) {
if (emptySparkPoolCap(free_caps[i])) {
- spark = findSpark(cap);
+ spark = tryStealSpark(cap->sparks);
if (spark != NULL) {
debugTrace(DEBUG_sched, "pushing spark %p to capability %d", spark, free_caps[i]->no);
newSpark(&(free_caps[i]->r), spark);
@@ -801,18 +847,14 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
// release the capabilities
for (i = 0; i < n_free_caps; i++) {
task->cap = free_caps[i];
- releaseCapability(free_caps[i]);
+ releaseAndWakeupCapability(free_caps[i]);
}
- // now wake them all up, and they might steal sparks if
- // the did not get a thread
- prodAllCapabilities();
}
task->cap = cap; // reset to point to our Capability.
#endif /* THREADED_RTS */
}
-#endif /* THREADED_RTS || PARALLEL_HASKELL */
/* ----------------------------------------------------------------------------
* Start any pending signal handlers
@@ -1031,7 +1073,12 @@ scheduleActivateSpark(Capability *cap)
on our run queue in the meantime ? But would need a lock.. */
return;
- spark = findSpark(cap); // defined in Sparks.c
+
+ // Really we should be using reclaimSpark() here, but
+ // experimentally it doesn't seem to perform as well as just
+ // stealing from our own spark pool:
+ // spark = reclaimSpark(cap->sparks);
+ spark = tryStealSpark(cap->sparks); // defined in Sparks.c
if (spark != NULL) {
debugTrace(DEBUG_sched,
@@ -1046,9 +1093,9 @@ scheduleActivateSpark(Capability *cap)
* Get work from a remote node (PARALLEL_HASKELL only)
* ------------------------------------------------------------------------- */
-#if defined(PARALLEL_HASKELL) || defined(THREADED_RTS)
+#if defined(PARALLEL_HASKELL)
static rtsBool /* return value used in PARALLEL_HASKELL only */
-scheduleGetRemoteWork(Capability *cap)
+scheduleGetRemoteWork (Capability *cap STG_UNUSED)
{
#if defined(PARALLEL_HASKELL)
rtsBool receivedFinish = rtsFalse;
@@ -1800,7 +1847,7 @@ suspendThread (StgRegTable *reg)
suspendTask(cap,task);
cap->in_haskell = rtsFalse;
- releaseCapability_(cap);
+ releaseCapability_(cap,rtsFalse);
RELEASE_LOCK(&cap->lock);