summaryrefslogtreecommitdiff
path: root/rts/Schedule.c
diff options
context:
space:
mode:
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r--rts/Schedule.c94
1 files changed, 55 insertions, 39 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 3f04eb2af1..28849f32e8 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -203,6 +203,7 @@ schedule (Capability *initialCapability, Task *task)
// Pre-condition: this task owns initialCapability.
// The sched_mutex is *NOT* held
// NB. on return, we still hold a capability.
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
@@ -210,6 +211,7 @@ schedule (Capability *initialCapability, Task *task)
// Scheduler loop starts here:
while (1) {
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
@@ -258,7 +260,7 @@ schedule (Capability *initialCapability, Task *task)
// * We might be left with threads blocked in foreign calls,
// we should really attempt to kill these somehow (TODO).
- switch (sched_state) {
+ switch (RELAXED_LOAD(&sched_state)) {
case SCHED_RUNNING:
break;
case SCHED_INTERRUPTING:
@@ -270,7 +272,7 @@ schedule (Capability *initialCapability, Task *task)
// other Capability did the final GC, or we did it above,
// either way we can fall through to the SCHED_SHUTTING_DOWN
// case now.
- ASSERT(sched_state == SCHED_SHUTTING_DOWN);
+ ASSERT(RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN);
// fall through
case SCHED_SHUTTING_DOWN:
@@ -372,7 +374,7 @@ schedule (Capability *initialCapability, Task *task)
// killed, kill it now. This sometimes happens when a finalizer
// thread is created by the final GC, or a thread previously
// in a foreign call returns.
- if (sched_state >= SCHED_INTERRUPTING &&
+ if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING &&
!(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
deleteThread(t);
}
@@ -403,7 +405,7 @@ schedule (Capability *initialCapability, Task *task)
*/
if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
&& !emptyThreadQueues(cap)) {
- cap->context_switch = 1;
+ RELAXED_STORE(&cap->context_switch, 1);
}
run_thread:
@@ -430,15 +432,15 @@ run_thread:
#endif
// reset the interrupt flag before running Haskell code
- cap->interrupt = 0;
+ RELAXED_STORE(&cap->interrupt, false);
cap->in_haskell = true;
- cap->idle = 0;
+ RELAXED_STORE(&cap->idle, false);
dirty_TSO(cap,t);
dirty_STACK(cap,t->stackobj);
- switch (recent_activity)
+ switch (SEQ_CST_LOAD(&recent_activity))
{
case ACTIVITY_DONE_GC: {
// ACTIVITY_DONE_GC means we turned off the timer signal to
@@ -459,7 +461,7 @@ run_thread:
// wakeUpRts().
break;
default:
- recent_activity = ACTIVITY_YES;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
}
traceEventRunThread(cap, t);
@@ -652,8 +654,16 @@ shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
// Capability keeps forcing a GC and the other Capabilities make no
// progress at all.
- return ((pending_sync && !didGcLast) ||
- cap->n_returning_tasks != 0 ||
+ // Note [Data race in shouldYieldCapability]
+ // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ // We would usually need to hold cap->lock to look at n_returning_tasks but
+ // we don't here since this is just an approximate predicate. We
+ // consequently need to use atomic accesses whenever touching
+ // n_returning_tasks. However, since this is an approximate predicate we can
+ // use a RELAXED ordering.
+
+ return ((RELAXED_LOAD(&pending_sync) && !didGcLast) ||
+ RELAXED_LOAD(&cap->n_returning_tasks) != 0 ||
(!emptyRunQueue(cap) && (task->incall->tso == NULL
? peekRunQueue(cap)->bound != NULL
: peekRunQueue(cap)->bound != task->incall)));
@@ -680,7 +690,7 @@ scheduleYield (Capability **pcap, Task *task)
if (!shouldYieldCapability(cap,task,false) &&
(!emptyRunQueue(cap) ||
!emptyInbox(cap) ||
- sched_state >= SCHED_INTERRUPTING)) {
+ RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING)) {
return;
}
@@ -739,7 +749,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
cap0 = capabilities[i];
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
- || cap0->n_returning_tasks != 0
+ || RELAXED_LOAD(&cap0->n_returning_tasks) != 0
|| !emptyInbox(cap0)) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
@@ -821,7 +831,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
appendToRunQueue(free_caps[i],t);
traceEventMigrateThread (cap, t, free_caps[i]->no);
- if (t->bound) { t->bound->task->cap = free_caps[i]; }
+ // See Note [Benign data race due to work-pushing].
+ if (t->bound) {
+ t->bound->task->cap = free_caps[i];
+ }
t->cap = free_caps[i];
n--; // we have one fewer threads now
i++; // move on to the next free_cap
@@ -926,7 +939,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
* we won't eagerly start a full GC just because we don't have
* any threads to run currently.
*/
- if (recent_activity != ACTIVITY_INACTIVE) return;
+ if (SEQ_CST_LOAD(&recent_activity) != ACTIVITY_INACTIVE) return;
#endif
if (task->incall->tso && task->incall->tso->why_blocked == BlockedOnIOCompletion) return;
@@ -1127,12 +1140,12 @@ schedulePostRunThread (Capability *cap, StgTSO *t)
static bool
scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
{
- if (cap->r.rHpLim == NULL || cap->context_switch) {
+ if (cap->r.rHpLim == NULL || RELAXED_LOAD(&cap->context_switch)) {
// Sometimes we miss a context switch, e.g. when calling
// primitives in a tight loop, MAYBE_GC() doesn't check the
// context switch flag, and we end up waiting for a GC.
// See #1984, and concurrent/should_run/1984
- cap->context_switch = 0;
+ RELAXED_STORE(&cap->context_switch, 0);
appendToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
@@ -1233,8 +1246,8 @@ scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
// the CPU because the tick always arrives during GC). This way
// penalises threads that do a lot of allocation, but that seems
// better than the alternative.
- if (cap->context_switch != 0) {
- cap->context_switch = 0;
+ if (RELAXED_LOAD(&cap->context_switch) != 0) {
+ RELAXED_STORE(&cap->context_switch, 0);
appendToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
@@ -1333,7 +1346,7 @@ scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
if (task->incall->ret) {
*(task->incall->ret) = NULL;
}
- if (sched_state >= SCHED_INTERRUPTING) {
+ if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING) {
if (heap_overflow) {
task->incall->rstat = HeapExhausted;
} else {
@@ -1478,7 +1491,7 @@ static bool requestSync (
sync->type);
ASSERT(*pcap);
yieldCapability(pcap,task,true);
- sync = pending_sync;
+ sync = SEQ_CST_LOAD(&pending_sync);
} while (sync != NULL);
}
@@ -1509,7 +1522,7 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
Capability *tmpcap;
uint32_t i;
- ASSERT(pending_sync != NULL);
+ ASSERT(SEQ_CST_LOAD(&pending_sync) != NULL);
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
i, n_capabilities);
@@ -1581,7 +1594,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
// cycle.
#endif
- if (sched_state == SCHED_SHUTTING_DOWN) {
+ if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
// The final GC has already been done, and the system is
// shutting down. We'll probably deadlock if we try to GC
// now.
@@ -1596,7 +1609,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
#if defined(THREADED_RTS)
- if (sched_state < SCHED_INTERRUPTING
+ if (RELAXED_LOAD(&sched_state) < SCHED_INTERRUPTING
&& RtsFlags.ParFlags.parGcEnabled
&& collect_gen >= RtsFlags.ParFlags.parGcGen
&& ! oldest_gen->mark)
@@ -1689,7 +1702,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
}
if (was_syncing &&
(prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
- !(sched_state == SCHED_INTERRUPTING && force_major)) {
+ !(RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && force_major)) {
// someone else had a pending sync request for a GC, so
// let's assume GC has been done and we don't need to GC
// again.
@@ -1697,7 +1710,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
// need to do the final GC.
return;
}
- if (sched_state == SCHED_SHUTTING_DOWN) {
+ if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
// The scheduler might now be shutting down. We tested
// this above, but it might have become true since then as
// we yielded the capability in requestSync().
@@ -1780,7 +1793,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
for (i=0; i < n_capabilities; i++) {
- capabilities[i]->idle++;
+ NONATOMIC_ADD(&capabilities[i]->idle, 1);
}
// For all capabilities participating in this GC, wait until
@@ -1802,7 +1815,7 @@ delete_threads_and_gc:
* threads in the system.
* Checking for major_gc ensures that the last GC is major.
*/
- if (sched_state == SCHED_INTERRUPTING && major_gc) {
+ if (RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && major_gc) {
deleteAllThreads();
#if defined(THREADED_RTS)
// Discard all the sparks from every Capability. Why?
@@ -1816,7 +1829,7 @@ delete_threads_and_gc:
discardSparksCap(capabilities[i]);
}
#endif
- sched_state = SCHED_SHUTTING_DOWN;
+ RELAXED_STORE(&sched_state, SCHED_SHUTTING_DOWN);
}
/*
@@ -1861,20 +1874,20 @@ delete_threads_and_gc:
#endif
// If we're shutting down, don't leave any idle GC work to do.
- if (sched_state == SCHED_SHUTTING_DOWN) {
+ if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
doIdleGCWork(cap, true /* all of it */);
}
traceSparkCounters(cap);
- switch (recent_activity) {
+ switch (SEQ_CST_LOAD(&recent_activity)) {
case ACTIVITY_INACTIVE:
if (force_major) {
// We are doing a GC because the system has been idle for a
// timeslice and we need to check for deadlock. Record the
// fact that we've done a GC and turn off the timer signal;
// it will get re-enabled if we run any threads after the GC.
- recent_activity = ACTIVITY_DONE_GC;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_DONE_GC);
#if !defined(PROFILING)
stopTimer();
#endif
@@ -1886,7 +1899,7 @@ delete_threads_and_gc:
// the GC might have taken long enough for the timer to set
// recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
// but we aren't necessarily deadlocked:
- recent_activity = ACTIVITY_YES;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
break;
case ACTIVITY_DONE_GC:
@@ -1936,7 +1949,7 @@ delete_threads_and_gc:
releaseGCThreads(cap, idle_cap);
}
#endif
- if (heap_overflow && sched_state == SCHED_RUNNING) {
+ if (heap_overflow && RELAXED_LOAD(&sched_state) == SCHED_RUNNING) {
// GC set the heap_overflow flag. We should throw an exception if we
// can, or shut down otherwise.
@@ -1948,7 +1961,7 @@ delete_threads_and_gc:
// shutdown now. Ultimately we want the main thread to return to
// its caller with HeapExhausted, at which point the caller should
// call hs_exit(). The first step is to delete all the threads.
- sched_state = SCHED_INTERRUPTING;
+ RELAXED_STORE(&sched_state, SCHED_INTERRUPTING);
goto delete_threads_and_gc;
}
@@ -2027,12 +2040,14 @@ forkProcess(HsStablePtr *entry
ACQUIRE_LOCK(&sm_mutex);
ACQUIRE_LOCK(&stable_ptr_mutex);
ACQUIRE_LOCK(&stable_name_mutex);
- ACQUIRE_LOCK(&task->lock);
for (i=0; i < n_capabilities; i++) {
ACQUIRE_LOCK(&capabilities[i]->lock);
}
+ // Take task lock after capability lock to avoid order inversion (#17275).
+ ACQUIRE_LOCK(&task->lock);
+
#if defined(THREADED_RTS)
ACQUIRE_LOCK(&all_tasks_mutex);
#endif
@@ -2634,8 +2649,9 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
#if defined(THREADED_RTS)
void scheduleWorker (Capability *cap, Task *task)
{
- // schedule() runs without a lock.
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
cap = schedule(cap,task);
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
// On exit from schedule(), we have a Capability, but possibly not
// the same one we started with.
@@ -2695,7 +2711,7 @@ initScheduler(void)
#endif
sched_state = SCHED_RUNNING;
- recent_activity = ACTIVITY_YES;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
/* Initialise the mutex and condition variables used by
@@ -2737,8 +2753,8 @@ exitScheduler (bool wait_foreign USED_IF_THREADS)
Task *task = newBoundTask();
// If we haven't killed all the threads yet, do it now.
- if (sched_state < SCHED_SHUTTING_DOWN) {
- sched_state = SCHED_INTERRUPTING;
+ if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN) {
+ RELAXED_STORE(&sched_state, SCHED_INTERRUPTING);
nonmovingStop();
Capability *cap = task->cap;
waitForCapability(&cap,task);