summaryrefslogtreecommitdiff
path: root/rts/Schedule.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2016-07-27 16:00:08 +0100
committerSimon Marlow <marlowsd@gmail.com>2016-08-03 08:07:34 +0100
commit988ad8ba8e709eff3cea59728e481bb269fa6185 (patch)
tree1a977f33b53ad1188900c077b4970cce3f078127 /rts/Schedule.c
parent55f5aed756cd5d464942dddcb33e0bd19b05f2a4 (diff)
downloadhaskell-988ad8ba8e709eff3cea59728e481bb269fa6185.tar.gz
Fix to thread migration
Summary: If we had 2 threads on the run queue, say [A,B], and B is bound to the current Task, then we would fail to migrate any threads. This fixes it so that we would migrate A in that case. This will help parallelism a bit in programs that have lots of bound threads. Test Plan: Test program in #12419, which is actually not a great program but it does behave a bit better after this change. Reviewers: ezyang, niteria, bgamari, austin, erikd Subscribers: thomie Differential Revision: https://phabricator.haskell.org/D2430 GHC Trac Issues: #12419
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r--rts/Schedule.c87
1 files changed, 63 insertions, 24 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index ee2d7dbb0d..908acf27fe 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -702,13 +702,16 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
Capability *free_caps[n_capabilities], *cap0;
uint32_t i, n_wanted_caps, n_free_caps;
+ uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0;
+
// migration can be turned off with +RTS -qm
- if (!RtsFlags.ParFlags.migrate) return;
+ if (!RtsFlags.ParFlags.migrate) {
+ spare_threads = 0;
+ }
// Figure out how many capabilities we want to wake up. We need at least
// sparkPoolSize(cap) plus the number of spare threads we have.
- n_wanted_caps = sparkPoolSizeCap(cap) + cap->n_run_queue - 1;
-
+ n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads;
if (n_wanted_caps == 0) return;
// First grab as many free Capabilities as we can. ToDo: we should use
@@ -730,10 +733,22 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
}
}
- // we now have n_free_caps free capabilities stashed in
- // free_caps[]. Share our run queue equally with them. This is
- // probably the simplest thing we could do; improvements we might
- // want to do include:
+ // We now have n_free_caps free capabilities stashed in
+ // free_caps[]. Attempt to share our run queue equally with them.
+ // This is complicated slightly by the fact that we can't move
+ // some threads:
+ //
+ // - threads that have TSO_LOCKED cannot migrate
+ // - a thread that is bound to the current Task cannot be migrated
+ //
+ // So we walk through the run queue, migrating threads to
+ // free_caps[] round-robin, skipping over immovable threads. Each
+ // time through free_caps[] we keep one thread for ourselves,
+ // provided we haven't encountered one or more immovable threads
+ // in this pass.
+ //
+ // This is about the simplest thing we could do; improvements we
+ // might want to do include:
//
// - giving high priority to moving relatively new threads, on
// the gournds that they haven't had time to build up a
@@ -748,10 +763,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
#endif
debugTrace(DEBUG_sched,
- "cap %d: %s and %d free capabilities, sharing...",
- cap->no,
- (cap->n_run_queue > 1)?
- "excess threads on run queue":"sparks to share (>=2)",
+ "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...",
+ cap->no, cap->n_run_queue, sparkPoolSizeCap(cap),
n_free_caps);
i = 0;
@@ -759,27 +772,56 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
pushed_to_all = rtsFalse;
#endif
- if (cap->run_queue_hd != END_TSO_QUEUE) {
- prev = cap->run_queue_hd;
- t = prev->_link;
- prev->_link = END_TSO_QUEUE;
+ // We want to share threads equally amongst free_caps[] and the
+ // current capability, but sometimes we encounter immovable
+ // threads. This counter tracks the number of threads we have kept
+ // for the current capability minus the number of passes over
+ // free_caps[]. If it is great than zero (due to immovable
+ // threads), we should try to bring it back to zero again by not
+ // keeping any threads for the current capability.
+ uint32_t imbalance = 0;
+
+ // n_free_caps may be larger than the number of spare threads we have,
+ // if there were sparks in the spark pool. To avoid giving away all our
+ // threads in this case, we limit the number of caps that we give
+ // threads to, to the number of spare threads (n_run_queue-1).
+ uint32_t thread_recipients = stg_min(spare_threads, n_free_caps);
+
+ if (thread_recipients > 0) {
+ prev = END_TSO_QUEUE;
+ t = cap->run_queue_hd;
for (; t != END_TSO_QUEUE; t = next) {
next = t->_link;
t->_link = END_TSO_QUEUE;
if (t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
- setTSOLink(cap, prev, t);
+ if (prev == END_TSO_QUEUE) {
+ cap->run_queue_hd = t;
+ } else {
+ setTSOLink(cap, prev, t);
+ }
setTSOPrev(cap, t, prev);
prev = t;
- } else if (i == n_free_caps) {
+ imbalance++;
+ } else if (i == thread_recipients) {
#ifdef SPARK_PUSHING
pushed_to_all = rtsTrue;
#endif
+ // If we have not already kept any threads for this
+ // capability during the current pass over free_caps[],
+ // keep one now.
+ if (imbalance == 0) {
+ if (prev == END_TSO_QUEUE) {
+ cap->run_queue_hd = t;
+ } else {
+ setTSOLink(cap, prev, t);
+ }
+ setTSOPrev(cap, t, prev);
+ prev = t;
+ } else {
+ imbalance--;
+ }
i = 0;
- // keep one for us
- setTSOLink(cap, prev, t);
- setTSOPrev(cap, t, prev);
- prev = t;
} else {
appendToRunQueue(free_caps[i],t);
cap->n_run_queue--;
@@ -2194,9 +2236,6 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
n_capabilities = enabled_capabilities = new_n_capabilities;
}
- // Start worker tasks on the new Capabilities
- startWorkerTasks(old_n_capabilities, new_n_capabilities);
-
// We're done: release the original Capabilities
releaseAllCapabilities(old_n_capabilities, cap,task);