summaryrefslogtreecommitdiff
diff options
context:
space:
mode:
-rw-r--r--rts/Capability.c9
-rw-r--r--rts/Capability.h23
-rw-r--r--rts/Schedule.c35
-rw-r--r--rts/Schedule.h15
-rw-r--r--rts/sm/Sanity.c6
5 files changed, 48 insertions, 40 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 7ca220fbd9..f2220f0651 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -99,7 +99,7 @@ findSpark (Capability *cap)
rtsBool retry;
uint32_t i = 0;
- if (!emptyRunQueue(cap) || cap->returning_tasks_hd != NULL) {
+ if (!emptyRunQueue(cap) || cap->n_returning_tasks != 0) {
// If there are other threads, don't try to run any new
// sparks: sparks might be speculative, we don't want to take
// resources away from the main computation.
@@ -212,6 +212,7 @@ newReturningTask (Capability *cap, Task *task)
cap->returning_tasks_hd = task;
}
cap->returning_tasks_tl = task;
+ cap->n_returning_tasks++;
}
STATIC_INLINE Task *
@@ -226,6 +227,7 @@ popReturningTask (Capability *cap)
cap->returning_tasks_tl = NULL;
}
task->next = NULL;
+ cap->n_returning_tasks--;
return task;
}
#endif
@@ -249,6 +251,7 @@ initCapability (Capability *cap, uint32_t i)
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
+ cap->n_run_queue = 0;
#if defined(THREADED_RTS)
initMutex(&cap->lock);
@@ -256,8 +259,10 @@ initCapability (Capability *cap, uint32_t i)
cap->spare_workers = NULL;
cap->n_spare_workers = 0;
cap->suspended_ccalls = NULL;
+ cap->n_suspended_ccalls = 0;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
+ cap->n_returning_tasks = 0;
cap->inbox = (Message*)END_TSO_QUEUE;
cap->sparks = allocSparkPool();
cap->spark_stats.created = 0;
@@ -507,7 +512,7 @@ releaseCapability_ (Capability* cap,
// Check to see whether a worker thread can be given
// the go-ahead to return the result of an external call..
- if (cap->returning_tasks_hd != NULL) {
+ if (cap->n_returning_tasks != 0) {
giveCapabilityToTask(cap,cap->returning_tasks_hd);
// The Task pops itself from the queue (see waitForCapability())
return;
diff --git a/rts/Capability.h b/rts/Capability.h
index 67b43280eb..6779624360 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -66,6 +66,7 @@ struct Capability_ {
// also lock-free.
StgTSO *run_queue_hd;
StgTSO *run_queue_tl;
+ uint32_t n_run_queue;
// Tasks currently making safe foreign calls. Doubly-linked.
// When returning, a task first acquires the Capability before
@@ -74,6 +75,7 @@ struct Capability_ {
// the returning_tasks list, we must also migrate its entry from
// this list.
InCall *suspended_ccalls;
+ uint32_t n_suspended_ccalls;
// One mutable list per generation, so we don't need to take any
// locks when updating an old-generation thunk. This also lets us
@@ -130,6 +132,7 @@ struct Capability_ {
// check whether it is NULL without taking the lock, however.
Task *returning_tasks_hd; // Singly-linked, with head/tail
Task *returning_tasks_tl;
+ uint32_t n_returning_tasks;
// Messages, or END_TSO_QUEUE.
// Locks required: cap->lock
@@ -171,15 +174,27 @@ struct Capability_ {
ASSERT(task->cap == cap); \
ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task)
+#if defined(THREADED_RTS)
+#define ASSERT_THREADED_CAPABILITY_INVARIANTS(cap,task) \
+ ASSERT(cap->returning_tasks_hd == NULL ? \
+ cap->returning_tasks_tl == NULL && cap->n_returning_tasks == 0 \
+ : 1);
+#else
+#define ASSERT_THREADED_CAPABILITY_INVARIANTS(cap,task) /* nothing */
+#endif
+
// Sometimes a Task holds a Capability, but the Task is not associated
// with that Capability (ie. task->cap != cap). This happens when
// (a) a Task holds multiple Capabilities, and (b) when the current
// Task is bound, its thread has just blocked, and it may have been
// moved to another Capability.
-#define ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task) \
- ASSERT(cap->run_queue_hd == END_TSO_QUEUE ? \
- cap->run_queue_tl == END_TSO_QUEUE : 1); \
- ASSERT(myTask() == task); \
+#define ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task) \
+ ASSERT(cap->run_queue_hd == END_TSO_QUEUE ? \
+ cap->run_queue_tl == END_TSO_QUEUE && cap->n_run_queue == 0 \
+ : 1); \
+ ASSERT(cap->suspended_ccalls == NULL ? cap->n_suspended_ccalls == 0 : 1); \
+ ASSERT_THREADED_CAPABILITY_INVARIANTS(cap,task); \
+ ASSERT(myTask() == task); \
ASSERT_TASK_ID(task);
#if defined(THREADED_RTS)
diff --git a/rts/Schedule.c b/rts/Schedule.c
index c3911afbe5..ee2d7dbb0d 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -574,6 +574,7 @@ removeFromRunQueue (Capability *cap, StgTSO *tso)
setTSOPrev(cap, tso->_link, tso->block_info.prev);
}
tso->_link = tso->block_info.prev = END_TSO_QUEUE;
+ cap->n_run_queue--;
IF_DEBUG(sanity, checkRunQueue(cap));
}
@@ -639,7 +640,7 @@ shouldYieldCapability (Capability *cap, Task *task, rtsBool didGcLast)
// progress at all.
return ((pending_sync && !didGcLast) ||
- cap->returning_tasks_hd != NULL ||
+ cap->n_returning_tasks != 0 ||
(!emptyRunQueue(cap) && (task->incall->tso == NULL
? peekRunQueue(cap)->bound != NULL
: peekRunQueue(cap)->bound != task->incall)));
@@ -700,31 +701,15 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
Capability *free_caps[n_capabilities], *cap0;
uint32_t i, n_wanted_caps, n_free_caps;
- StgTSO *t;
// migration can be turned off with +RTS -qm
if (!RtsFlags.ParFlags.migrate) return;
- // Check whether we have more threads on our run queue, or sparks
- // in our pool, that we could hand to another Capability.
- if (emptyRunQueue(cap)) {
- if (sparkPoolSizeCap(cap) < 2) return;
- } else {
- if (singletonRunQueue(cap) &&
- sparkPoolSizeCap(cap) < 1) return;
- }
-
// Figure out how many capabilities we want to wake up. We need at least
// sparkPoolSize(cap) plus the number of spare threads we have.
- t = cap->run_queue_hd;
- n_wanted_caps = sparkPoolSizeCap(cap);
- if (t != END_TSO_QUEUE) {
- do {
- t = t->_link;
- if (t == END_TSO_QUEUE) break;
- n_wanted_caps++;
- } while (n_wanted_caps < n_capabilities-1);
- }
+ n_wanted_caps = sparkPoolSizeCap(cap) + cap->n_run_queue - 1;
+
+ if (n_wanted_caps == 0) return;
// First grab as many free Capabilities as we can. ToDo: we should use
// capabilities on the same NUMA node preferably, but not exclusively.
@@ -734,7 +719,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
cap0 = capabilities[i];
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
- || cap0->returning_tasks_hd != NULL
+ || cap0->n_returning_tasks != 0
|| cap0->inbox != (Message*)END_TSO_QUEUE) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
@@ -765,7 +750,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
debugTrace(DEBUG_sched,
"cap %d: %s and %d free capabilities, sharing...",
cap->no,
- (!emptyRunQueue(cap) && !singletonRunQueue(cap))?
+ (cap->n_run_queue > 1)?
"excess threads on run queue":"sparks to share (>=2)",
n_free_caps);
@@ -797,6 +782,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
prev = t;
} else {
appendToRunQueue(free_caps[i],t);
+ cap->n_run_queue--;
traceEventMigrateThread (cap, t, free_caps[i]->no);
@@ -2039,10 +2025,12 @@ forkProcess(HsStablePtr *entry
// bound threads for which the corresponding Task does not
// exist.
truncateRunQueue(cap);
+ cap->n_run_queue = 0;
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
cap->suspended_ccalls = NULL;
+ cap->n_suspended_ccalls = 0;
#if defined(THREADED_RTS)
// Wipe our spare workers list, they no longer exist. New
@@ -2051,6 +2039,7 @@ forkProcess(HsStablePtr *entry
cap->n_spare_workers = 0;
cap->returning_tasks_hd = NULL;
cap->returning_tasks_tl = NULL;
+ cap->n_returning_tasks = 0;
#endif
// Release all caps except 0, we'll use that for starting
@@ -2277,6 +2266,7 @@ suspendTask (Capability *cap, Task *task)
cap->suspended_ccalls->prev = incall;
}
cap->suspended_ccalls = incall;
+ cap->n_suspended_ccalls++;
}
STATIC_INLINE void
@@ -2295,6 +2285,7 @@ recoverSuspendedTask (Capability *cap, Task *task)
incall->next->prev = incall->prev;
}
incall->next = incall->prev = NULL;
+ cap->n_suspended_ccalls--;
}
/* ---------------------------------------------------------------------------
diff --git a/rts/Schedule.h b/rts/Schedule.h
index 03a78c9ae9..a8d1fb8b76 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -139,6 +139,7 @@ appendToRunQueue (Capability *cap, StgTSO *tso)
setTSOPrev(cap, tso, cap->run_queue_tl);
}
cap->run_queue_tl = tso;
+ cap->n_run_queue++;
}
/* Push a thread on the beginning of the run queue.
@@ -159,6 +160,7 @@ pushOnRunQueue (Capability *cap, StgTSO *tso)
if (cap->run_queue_tl == END_TSO_QUEUE) {
cap->run_queue_tl = tso;
}
+ cap->n_run_queue++;
}
/* Pop the first thread off the runnable queue.
@@ -176,6 +178,7 @@ popRunQueue (Capability *cap)
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_tl = END_TSO_QUEUE;
}
+ cap->n_run_queue--;
return t;
}
@@ -214,16 +217,7 @@ emptyQueue (StgTSO *q)
INLINE_HEADER rtsBool
emptyRunQueue(Capability *cap)
{
- return emptyQueue(cap->run_queue_hd);
-}
-
-/* assumes that the queue is not empty; so combine this with
- * an emptyRunQueue check! */
-INLINE_HEADER rtsBool
-singletonRunQueue(Capability *cap)
-{
- ASSERT(!emptyRunQueue(cap));
- return cap->run_queue_hd->_link == END_TSO_QUEUE;
+ return cap->n_run_queue == 0;
}
INLINE_HEADER void
@@ -231,6 +225,7 @@ truncateRunQueue(Capability *cap)
{
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
+ cap->n_run_queue = 0;
}
#if !defined(THREADED_RTS)
diff --git a/rts/sm/Sanity.c b/rts/sm/Sanity.c
index 6f6b15c4e8..f1b57eae66 100644
--- a/rts/sm/Sanity.c
+++ b/rts/sm/Sanity.c
@@ -839,12 +839,14 @@ checkRunQueue(Capability *cap)
{
StgTSO *prev, *tso;
prev = END_TSO_QUEUE;
- for (tso = cap->run_queue_hd; tso != END_TSO_QUEUE;
- prev = tso, tso = tso->_link) {
+ uint32_t n;
+ for (n = 0, tso = cap->run_queue_hd; tso != END_TSO_QUEUE;
+ prev = tso, tso = tso->_link, n++) {
ASSERT(prev == END_TSO_QUEUE || prev->_link == tso);
ASSERT(tso->block_info.prev == prev);
}
ASSERT(cap->run_queue_tl == prev);
+ ASSERT(cap->n_run_queue == n);
}
/* -----------------------------------------------------------------------------