diff options
author | Simon Marlow <marlowsd@gmail.com> | 2016-08-02 09:55:31 +0100 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2016-08-03 08:07:34 +0100 |
commit | 55f5aed756cd5d464942dddcb33e0bd19b05f2a4 (patch) | |
tree | f8c0acd76b0945d44cca86946bd638ceee440aa3 | |
parent | 36565a9ba200d40e0be8407e57ada1b4a1c55814 (diff) | |
download | haskell-55f5aed756cd5d464942dddcb33e0bd19b05f2a4.tar.gz |
Track the lengths of the thread queues
Summary:
Knowing the length of the run queue in O(1) time is useful: for example
we don't have to traverse the run queue to know how many threads we have
to migrate in schedulePushWork().
Test Plan: validate
Reviewers: ezyang, erikd, bgamari, austin
Subscribers: thomie
Differential Revision: https://phabricator.haskell.org/D2437
-rw-r--r-- | rts/Capability.c | 9 | ||||
-rw-r--r-- | rts/Capability.h | 23 | ||||
-rw-r--r-- | rts/Schedule.c | 35 | ||||
-rw-r--r-- | rts/Schedule.h | 15 | ||||
-rw-r--r-- | rts/sm/Sanity.c | 6 |
5 files changed, 48 insertions, 40 deletions
diff --git a/rts/Capability.c b/rts/Capability.c index 7ca220fbd9..f2220f0651 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -99,7 +99,7 @@ findSpark (Capability *cap) rtsBool retry; uint32_t i = 0; - if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) { + if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) { // If there are other threads, don't try to run any new // sparks: sparks might be speculative, we don't want to take // resources away from the main computation. @@ -212,6 +212,7 @@ newReturningTask (Capability *cap, Task *task) cap->returning_tasks_hd = task; } cap->returning_tasks_tl = task; + cap->n_returning_tasks++; } STATIC_INLINE Task * @@ -226,6 +227,7 @@ popReturningTask (Capability *cap) cap->returning_tasks_tl = NULL; } task->next = NULL; + cap->n_returning_tasks--; return task; } #endif @@ -249,6 +251,7 @@ initCapability (Capability *cap, uint32_t i) cap->run_queue_hd = END_TSO_QUEUE; cap->run_queue_tl = END_TSO_QUEUE; + cap->n_run_queue = 0; #if defined(THREADED_RTS) initMutex(&cap->lock); @@ -256,8 +259,10 @@ initCapability (Capability *cap, uint32_t i) cap->spare_workers = NULL; cap->n_spare_workers = 0; cap->suspended_ccalls = NULL; + cap->n_suspended_ccalls = 0; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; + cap->n_returning_tasks = 0; cap->inbox = (Message*)END_TSO_QUEUE; cap->sparks = allocSparkPool(); cap->spark_stats.created = 0; @@ -507,7 +512,7 @@ releaseCapability_ (Capability* cap, // Check to see whether a worker thread can be given // the go-ahead to return the result of an external call.. - if (cap->returning_tasks_hd != NULL) { + if (cap->n_returning_tasks != 0) { giveCapabilityToTask(cap,cap->returning_tasks_hd); // The Task pops itself from the queue (see waitForCapability()) return; diff --git a/rts/Capability.h b/rts/Capability.h index 67b43280eb..6779624360 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -66,6 +66,7 @@ struct Capability_ { // also lock-free. StgTSO *run_queue_hd; StgTSO *run_queue_tl; + uint32_t n_run_queue; // Tasks currently making safe foreign calls. Doubly-linked. // When returning, a task first acquires the Capability before @@ -74,6 +75,7 @@ struct Capability_ { // the returning_tasks list, we must also migrate its entry from // this list. InCall *suspended_ccalls; + uint32_t n_suspended_ccalls; // One mutable list per generation, so we don't need to take any // locks when updating an old-generation thunk. This also lets us @@ -130,6 +132,7 @@ struct Capability_ { // check whether it is NULL without taking the lock, however. Task *returning_tasks_hd; // Singly-linked, with head/tail Task *returning_tasks_tl; + uint32_t n_returning_tasks; // Messages, or END_TSO_QUEUE. // Locks required: cap->lock @@ -171,15 +174,27 @@ struct Capability_ { ASSERT(task->cap == cap); \ ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task) +#if defined(THREADED_RTS) +#define ASSERT_THREADED_CAPABILITY_INVARIANTS(cap,task) \ + ASSERT(cap->returning_tasks_hd == NULL ? \ + cap->returning_tasks_tl == NULL && cap->n_returning_tasks == 0 \ + : 1); +#else +#define ASSERT_THREADED_CAPABILITY_INVARIANTS(cap,task) /* nothing */ +#endif + // Sometimes a Task holds a Capability, but the Task is not associated // with that Capability (ie. task->cap != cap). This happens when // (a) a Task holds multiple Capabilities, and (b) when the current // Task is bound, its thread has just blocked, and it may have been // moved to another Capability. -#define ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task) \ - ASSERT(cap->run_queue_hd == END_TSO_QUEUE ? \ - cap->run_queue_tl == END_TSO_QUEUE : 1); \ - ASSERT(myTask() == task); \ +#define ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task) \ + ASSERT(cap->run_queue_hd == END_TSO_QUEUE ? \ + cap->run_queue_tl == END_TSO_QUEUE && cap->n_run_queue == 0 \ + : 1); \ + ASSERT(cap->suspended_ccalls == NULL ? cap->n_suspended_ccalls == 0 : 1); \ + ASSERT_THREADED_CAPABILITY_INVARIANTS(cap,task); \ + ASSERT(myTask() == task); \ ASSERT_TASK_ID(task); #if defined(THREADED_RTS) diff --git a/rts/Schedule.c b/rts/Schedule.c index c3911afbe5..ee2d7dbb0d 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -574,6 +574,7 @@ removeFromRunQueue (Capability *cap, StgTSO *tso) setTSOPrev(cap, tso->_link, tso->block_info.prev); } tso->_link = tso->block_info.prev = END_TSO_QUEUE; + cap->n_run_queue--; IF_DEBUG(sanity, checkRunQueue(cap)); } @@ -639,7 +640,7 @@ shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast) // progress at all. return ((pending_sync && !didGcLast) || - cap->returning_tasks_hd != NULL || + cap->n_returning_tasks != 0 || (!emptyRunQueue(cap) && (task->incall->tso == NULL ? peekRunQueue(cap)->bound != NULL : peekRunQueue(cap)->bound != task->incall))); @@ -700,31 +701,15 @@ schedulePushWork(Capability *cap USED_IF_THREADS, Capability *free_caps[n_capabilities], *cap0; uint32_t i, n_wanted_caps, n_free_caps; - StgTSO *t; // migration can be turned off with +RTS -qm if (!RtsFlags.ParFlags.migrate) return; - // Check whether we have more threads on our run queue, or sparks - // in our pool, that we could hand to another Capability. - if (emptyRunQueue(cap)) { - if (sparkPoolSizeCap(cap) < 2) return; - } else { - if (singletonRunQueue(cap) && - sparkPoolSizeCap(cap) < 1) return; - } - // Figure out how many capabilities we want to wake up. We need at least // sparkPoolSize(cap) plus the number of spare threads we have. - t = cap->run_queue_hd; - n_wanted_caps = sparkPoolSizeCap(cap); - if (t != END_TSO_QUEUE) { - do { - t = t->_link; - if (t == END_TSO_QUEUE) break; - n_wanted_caps++; - } while (n_wanted_caps < n_capabilities-1); - } + n_wanted_caps = sparkPoolSizeCap(cap) + cap->n_run_queue - 1; + + if (n_wanted_caps == 0) return; // First grab as many free Capabilities as we can. ToDo: we should use // capabilities on the same NUMA node preferably, but not exclusively. @@ -734,7 +719,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, cap0 = capabilities[i]; if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) { if (!emptyRunQueue(cap0) - || cap0->returning_tasks_hd != NULL + || cap0->n_returning_tasks != 0 || cap0->inbox != (Message*)END_TSO_QUEUE) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! @@ -765,7 +750,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, debugTrace(DEBUG_sched, "cap %d: %s and %d free capabilities, sharing...", cap->no, - (!emptyRunQueue(cap) && !singletonRunQueue(cap))? + (cap->n_run_queue > 1)? "excess threads on run queue":"sparks to share (>=2)", n_free_caps); @@ -797,6 +782,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, prev = t; } else { appendToRunQueue(free_caps[i],t); + cap->n_run_queue--; traceEventMigrateThread (cap, t, free_caps[i]->no); @@ -2039,10 +2025,12 @@ forkProcess(HsStablePtr *entry // bound threads for which the corresponding Task does not // exist. truncateRunQueue(cap); + cap->n_run_queue = 0; // Any suspended C-calling Tasks are no more, their OS threads // don't exist now: cap->suspended_ccalls = NULL; + cap->n_suspended_ccalls = 0; #if defined(THREADED_RTS) // Wipe our spare workers list, they no longer exist. New @@ -2051,6 +2039,7 @@ forkProcess(HsStablePtr *entry cap->n_spare_workers = 0; cap->returning_tasks_hd = NULL; cap->returning_tasks_tl = NULL; + cap->n_returning_tasks = 0; #endif // Release all caps except 0, we'll use that for starting @@ -2277,6 +2266,7 @@ suspendTask (Capability *cap, Task *task) cap->suspended_ccalls->prev = incall; } cap->suspended_ccalls = incall; + cap->n_suspended_ccalls++; } STATIC_INLINE void @@ -2295,6 +2285,7 @@ recoverSuspendedTask (Capability *cap, Task *task) incall->next->prev = incall->prev; } incall->next = incall->prev = NULL; + cap->n_suspended_ccalls--; } /* --------------------------------------------------------------------------- diff --git a/rts/Schedule.h b/rts/Schedule.h index 03a78c9ae9..a8d1fb8b76 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -139,6 +139,7 @@ appendToRunQueue (Capability *cap, StgTSO *tso) setTSOPrev(cap, tso, cap->run_queue_tl); } cap->run_queue_tl = tso; + cap->n_run_queue++; } /* Push a thread on the beginning of the run queue. @@ -159,6 +160,7 @@ pushOnRunQueue (Capability *cap, StgTSO *tso) if (cap->run_queue_tl == END_TSO_QUEUE) { cap->run_queue_tl = tso; } + cap->n_run_queue++; } /* Pop the first thread off the runnable queue. @@ -176,6 +178,7 @@ popRunQueue (Capability *cap) if (cap->run_queue_hd == END_TSO_QUEUE) { cap->run_queue_tl = END_TSO_QUEUE; } + cap->n_run_queue--; return t; } @@ -214,16 +217,7 @@ emptyQueue (StgTSO *q) INLINE_HEADER rtsBool emptyRunQueue(Capability *cap) { - return emptyQueue(cap->run_queue_hd); -} - -/* assumes that the queue is not empty; so combine this with - * an emptyRunQueue check! */ -INLINE_HEADER rtsBool -singletonRunQueue(Capability *cap) -{ - ASSERT(!emptyRunQueue(cap)); - return cap->run_queue_hd->_link == END_TSO_QUEUE; + return cap->n_run_queue == 0; } INLINE_HEADER void @@ -231,6 +225,7 @@ truncateRunQueue(Capability *cap) { cap->run_queue_hd = END_TSO_QUEUE; cap->run_queue_tl = END_TSO_QUEUE; + cap->n_run_queue = 0; } #if !defined(THREADED_RTS) diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c index 6f6b15c4e8..f1b57eae66 100644 --- a/rts/sm/Sanity.c +++ b/rts/sm/Sanity.c @@ -839,12 +839,14 @@ checkRunQueue(Capability *cap) { StgTSO *prev, *tso; prev = END_TSO_QUEUE; - for (tso = cap->run_queue_hd; tso != END_TSO_QUEUE; - prev = tso, tso = tso->_link) { + uint32_t n; + for (n = 0, tso = cap->run_queue_hd; tso != END_TSO_QUEUE; + prev = tso, tso = tso->_link, n++) { ASSERT(prev == END_TSO_QUEUE || prev->_link == tso); ASSERT(tso->block_info.prev == prev); } ASSERT(cap->run_queue_tl == prev); + ASSERT(cap->n_run_queue == n); } /* ----------------------------------------------------------------------------- |