diff options
author | Simon Marlow <marlowsd@gmail.com> | 2016-07-27 16:00:08 +0100 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2016-08-03 08:07:34 +0100 |
commit | 988ad8ba8e709eff3cea59728e481bb269fa6185 (patch) | |
tree | 1a977f33b53ad1188900c077b4970cce3f078127 /rts/Schedule.c | |
parent | 55f5aed756cd5d464942dddcb33e0bd19b05f2a4 (diff) | |
download | haskell-988ad8ba8e709eff3cea59728e481bb269fa6185.tar.gz |
Fix to thread migration
Summary:
If we had 2 threads on the run queue, say [A,B], and B is bound to the
current Task, then we would fail to migrate any threads. This fixes it
so that we would migrate A in that case.
This will help parallelism a bit in programs that have lots of bound
threads.
Test Plan:
Test program in #12419, which is actually not a great program but it
does behave a bit better after this change.
Reviewers: ezyang, niteria, bgamari, austin, erikd
Subscribers: thomie
Differential Revision: https://phabricator.haskell.org/D2430
GHC Trac Issues: #12419
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r-- | rts/Schedule.c | 87 |
1 files changed, 63 insertions, 24 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index ee2d7dbb0d..908acf27fe 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -702,13 +702,16 @@ schedulePushWork(Capability *cap USED_IF_THREADS, Capability *free_caps[n_capabilities], *cap0; uint32_t i, n_wanted_caps, n_free_caps; + uint32_t spare_threads = cap->n_run_queue > 0 ? cap->n_run_queue - 1 : 0; + // migration can be turned off with +RTS -qm - if (!RtsFlags.ParFlags.migrate) return; + if (!RtsFlags.ParFlags.migrate) { + spare_threads = 0; + } // Figure out how many capabilities we want to wake up. We need at least // sparkPoolSize(cap) plus the number of spare threads we have. - n_wanted_caps = sparkPoolSizeCap(cap) + cap->n_run_queue - 1; - + n_wanted_caps = sparkPoolSizeCap(cap) + spare_threads; if (n_wanted_caps == 0) return; // First grab as many free Capabilities as we can. ToDo: we should use @@ -730,10 +733,22 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } } - // we now have n_free_caps free capabilities stashed in - // free_caps[]. Share our run queue equally with them. This is - // probably the simplest thing we could do; improvements we might - // want to do include: + // We now have n_free_caps free capabilities stashed in + // free_caps[]. Attempt to share our run queue equally with them. + // This is complicated slightly by the fact that we can't move + // some threads: + // + // - threads that have TSO_LOCKED cannot migrate + // - a thread that is bound to the current Task cannot be migrated + // + // So we walk through the run queue, migrating threads to + // free_caps[] round-robin, skipping over immovable threads. Each + // time through free_caps[] we keep one thread for ourselves, + // provided we haven't encountered one or more immovable threads + // in this pass. + // + // This is about the simplest thing we could do; improvements we + // might want to do include: // // - giving high priority to moving relatively new threads, on // the gournds that they haven't had time to build up a @@ -748,10 +763,8 @@ schedulePushWork(Capability *cap USED_IF_THREADS, #endif debugTrace(DEBUG_sched, - "cap %d: %s and %d free capabilities, sharing...", - cap->no, - (cap->n_run_queue > 1)? - "excess threads on run queue":"sparks to share (>=2)", + "cap %d: %d threads, %d sparks, and %d free capabilities, sharing...", + cap->no, cap->n_run_queue, sparkPoolSizeCap(cap), n_free_caps); i = 0; @@ -759,27 +772,56 @@ schedulePushWork(Capability *cap USED_IF_THREADS, pushed_to_all = rtsFalse; #endif - if (cap->run_queue_hd != END_TSO_QUEUE) { - prev = cap->run_queue_hd; - t = prev->_link; - prev->_link = END_TSO_QUEUE; + // We want to share threads equally amongst free_caps[] and the + // current capability, but sometimes we encounter immovable + // threads. This counter tracks the number of threads we have kept + // for the current capability minus the number of passes over + // free_caps[]. If it is great than zero (due to immovable + // threads), we should try to bring it back to zero again by not + // keeping any threads for the current capability. + uint32_t imbalance = 0; + + // n_free_caps may be larger than the number of spare threads we have, + // if there were sparks in the spark pool. To avoid giving away all our + // threads in this case, we limit the number of caps that we give + // threads to, to the number of spare threads (n_run_queue-1). + uint32_t thread_recipients = stg_min(spare_threads, n_free_caps); + + if (thread_recipients > 0) { + prev = END_TSO_QUEUE; + t = cap->run_queue_hd; for (; t != END_TSO_QUEUE; t = next) { next = t->_link; t->_link = END_TSO_QUEUE; if (t->bound == task->incall // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread - setTSOLink(cap, prev, t); + if (prev == END_TSO_QUEUE) { + cap->run_queue_hd = t; + } else { + setTSOLink(cap, prev, t); + } setTSOPrev(cap, t, prev); prev = t; - } else if (i == n_free_caps) { + imbalance++; + } else if (i == thread_recipients) { #ifdef SPARK_PUSHING pushed_to_all = rtsTrue; #endif + // If we have not already kept any threads for this + // capability during the current pass over free_caps[], + // keep one now. + if (imbalance == 0) { + if (prev == END_TSO_QUEUE) { + cap->run_queue_hd = t; + } else { + setTSOLink(cap, prev, t); + } + setTSOPrev(cap, t, prev); + prev = t; + } else { + imbalance--; + } i = 0; - // keep one for us - setTSOLink(cap, prev, t); - setTSOPrev(cap, t, prev); - prev = t; } else { appendToRunQueue(free_caps[i],t); cap->n_run_queue--; @@ -2194,9 +2236,6 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS) n_capabilities = enabled_capabilities = new_n_capabilities; } - // Start worker tasks on the new Capabilities - startWorkerTasks(old_n_capabilities, new_n_capabilities); - // We're done: release the original Capabilities releaseAllCapabilities(old_n_capabilities, cap,task); |