diff options
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r-- | rts/Schedule.c | 94 |
1 files changed, 55 insertions, 39 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index 3f04eb2af1..28849f32e8 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -203,6 +203,7 @@ schedule (Capability *initialCapability, Task *task) // Pre-condition: this task owns initialCapability. // The sched_mutex is *NOT* held // NB. on return, we still hold a capability. + ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task); debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no); @@ -210,6 +211,7 @@ schedule (Capability *initialCapability, Task *task) // Scheduler loop starts here: while (1) { + ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task); // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign @@ -258,7 +260,7 @@ schedule (Capability *initialCapability, Task *task) // * We might be left with threads blocked in foreign calls, // we should really attempt to kill these somehow (TODO). - switch (sched_state) { + switch (RELAXED_LOAD(&sched_state)) { case SCHED_RUNNING: break; case SCHED_INTERRUPTING: @@ -270,7 +272,7 @@ schedule (Capability *initialCapability, Task *task) // other Capability did the final GC, or we did it above, // either way we can fall through to the SCHED_SHUTTING_DOWN // case now. - ASSERT(sched_state == SCHED_SHUTTING_DOWN); + ASSERT(RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN); // fall through case SCHED_SHUTTING_DOWN: @@ -372,7 +374,7 @@ schedule (Capability *initialCapability, Task *task) // killed, kill it now. This sometimes happens when a finalizer // thread is created by the final GC, or a thread previously // in a foreign call returns. - if (sched_state >= SCHED_INTERRUPTING && + if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING && !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) { deleteThread(t); } @@ -403,7 +405,7 @@ schedule (Capability *initialCapability, Task *task) */ if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 && !emptyThreadQueues(cap)) { - cap->context_switch = 1; + RELAXED_STORE(&cap->context_switch, 1); } run_thread: @@ -430,15 +432,15 @@ run_thread: #endif // reset the interrupt flag before running Haskell code - cap->interrupt = 0; + RELAXED_STORE(&cap->interrupt, false); cap->in_haskell = true; - cap->idle = 0; + RELAXED_STORE(&cap->idle, false); dirty_TSO(cap,t); dirty_STACK(cap,t->stackobj); - switch (recent_activity) + switch (SEQ_CST_LOAD(&recent_activity)) { case ACTIVITY_DONE_GC: { // ACTIVITY_DONE_GC means we turned off the timer signal to @@ -459,7 +461,7 @@ run_thread: // wakeUpRts(). break; default: - recent_activity = ACTIVITY_YES; + SEQ_CST_STORE(&recent_activity, ACTIVITY_YES); } traceEventRunThread(cap, t); @@ -652,8 +654,16 @@ shouldYieldCapability (Capability *cap, Task *task, bool didGcLast) // Capability keeps forcing a GC and the other Capabilities make no // progress at all. - return ((pending_sync && !didGcLast) || - cap->n_returning_tasks != 0 || + // Note [Data race in shouldYieldCapability] + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + // We would usually need to hold cap->lock to look at n_returning_tasks but + // we don't here since this is just an approximate predicate. We + // consequently need to use atomic accesses whenever touching + // n_returning_tasks. However, since this is an approximate predicate we can + // use a RELAXED ordering. + + return ((RELAXED_LOAD(&pending_sync) && !didGcLast) || + RELAXED_LOAD(&cap->n_returning_tasks) != 0 || (!emptyRunQueue(cap) && (task->incall->tso == NULL ? peekRunQueue(cap)->bound != NULL : peekRunQueue(cap)->bound != task->incall))); @@ -680,7 +690,7 @@ scheduleYield (Capability **pcap, Task *task) if (!shouldYieldCapability(cap,task,false) && (!emptyRunQueue(cap) || !emptyInbox(cap) || - sched_state >= SCHED_INTERRUPTING)) { + RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING)) { return; } @@ -739,7 +749,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, cap0 = capabilities[i]; if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) { if (!emptyRunQueue(cap0) - || cap0->n_returning_tasks != 0 + || RELAXED_LOAD(&cap0->n_returning_tasks) != 0 || !emptyInbox(cap0)) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! @@ -821,7 +831,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS, appendToRunQueue(free_caps[i],t); traceEventMigrateThread (cap, t, free_caps[i]->no); - if (t->bound) { t->bound->task->cap = free_caps[i]; } + // See Note [Benign data race due to work-pushing]. + if (t->bound) { + t->bound->task->cap = free_caps[i]; + } t->cap = free_caps[i]; n--; // we have one fewer threads now i++; // move on to the next free_cap @@ -926,7 +939,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task) * we won't eagerly start a full GC just because we don't have * any threads to run currently. */ - if (recent_activity != ACTIVITY_INACTIVE) return; + if (SEQ_CST_LOAD(&recent_activity) != ACTIVITY_INACTIVE) return; #endif if (task->incall->tso && task->incall->tso->why_blocked == BlockedOnIOCompletion) return; @@ -1127,12 +1140,12 @@ schedulePostRunThread (Capability *cap, StgTSO *t) static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) { - if (cap->r.rHpLim == NULL || cap->context_switch) { + if (cap->r.rHpLim == NULL || RELAXED_LOAD(&cap->context_switch)) { // Sometimes we miss a context switch, e.g. when calling // primitives in a tight loop, MAYBE_GC() doesn't check the // context switch flag, and we end up waiting for a GC. // See #1984, and concurrent/should_run/1984 - cap->context_switch = 0; + RELAXED_STORE(&cap->context_switch, 0); appendToRunQueue(cap,t); } else { pushOnRunQueue(cap,t); @@ -1233,8 +1246,8 @@ scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next ) // the CPU because the tick always arrives during GC). This way // penalises threads that do a lot of allocation, but that seems // better than the alternative. - if (cap->context_switch != 0) { - cap->context_switch = 0; + if (RELAXED_LOAD(&cap->context_switch) != 0) { + RELAXED_STORE(&cap->context_switch, 0); appendToRunQueue(cap,t); } else { pushOnRunQueue(cap,t); @@ -1333,7 +1346,7 @@ scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t) if (task->incall->ret) { *(task->incall->ret) = NULL; } - if (sched_state >= SCHED_INTERRUPTING) { + if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING) { if (heap_overflow) { task->incall->rstat = HeapExhausted; } else { @@ -1478,7 +1491,7 @@ static bool requestSync ( sync->type); ASSERT(*pcap); yieldCapability(pcap,task,true); - sync = pending_sync; + sync = SEQ_CST_LOAD(&pending_sync); } while (sync != NULL); } @@ -1509,7 +1522,7 @@ static void acquireAllCapabilities(Capability *cap, Task *task) Capability *tmpcap; uint32_t i; - ASSERT(pending_sync != NULL); + ASSERT(SEQ_CST_LOAD(&pending_sync) != NULL); for (i=0; i < n_capabilities; i++) { debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities); @@ -1581,7 +1594,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, // cycle. #endif - if (sched_state == SCHED_SHUTTING_DOWN) { + if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) { // The final GC has already been done, and the system is // shutting down. We'll probably deadlock if we try to GC // now. @@ -1596,7 +1609,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, major_gc = (collect_gen == RtsFlags.GcFlags.generations-1); #if defined(THREADED_RTS) - if (sched_state < SCHED_INTERRUPTING + if (RELAXED_LOAD(&sched_state) < SCHED_INTERRUPTING && RtsFlags.ParFlags.parGcEnabled && collect_gen >= RtsFlags.ParFlags.parGcGen && ! oldest_gen->mark) @@ -1689,7 +1702,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, } if (was_syncing && (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) && - !(sched_state == SCHED_INTERRUPTING && force_major)) { + !(RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && force_major)) { // someone else had a pending sync request for a GC, so // let's assume GC has been done and we don't need to GC // again. @@ -1697,7 +1710,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, // need to do the final GC. return; } - if (sched_state == SCHED_SHUTTING_DOWN) { + if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) { // The scheduler might now be shutting down. We tested // this above, but it might have become true since then as // we yielded the capability in requestSync(). @@ -1780,7 +1793,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps); for (i=0; i < n_capabilities; i++) { - capabilities[i]->idle++; + NONATOMIC_ADD(&capabilities[i]->idle, 1); } // For all capabilities participating in this GC, wait until @@ -1802,7 +1815,7 @@ delete_threads_and_gc: * threads in the system. * Checking for major_gc ensures that the last GC is major. */ - if (sched_state == SCHED_INTERRUPTING && major_gc) { + if (RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && major_gc) { deleteAllThreads(); #if defined(THREADED_RTS) // Discard all the sparks from every Capability. Why? @@ -1816,7 +1829,7 @@ delete_threads_and_gc: discardSparksCap(capabilities[i]); } #endif - sched_state = SCHED_SHUTTING_DOWN; + RELAXED_STORE(&sched_state, SCHED_SHUTTING_DOWN); } /* @@ -1861,20 +1874,20 @@ delete_threads_and_gc: #endif // If we're shutting down, don't leave any idle GC work to do. - if (sched_state == SCHED_SHUTTING_DOWN) { + if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) { doIdleGCWork(cap, true /* all of it */); } traceSparkCounters(cap); - switch (recent_activity) { + switch (SEQ_CST_LOAD(&recent_activity)) { case ACTIVITY_INACTIVE: if (force_major) { // We are doing a GC because the system has been idle for a // timeslice and we need to check for deadlock. Record the // fact that we've done a GC and turn off the timer signal; // it will get re-enabled if we run any threads after the GC. - recent_activity = ACTIVITY_DONE_GC; + SEQ_CST_STORE(&recent_activity, ACTIVITY_DONE_GC); #if !defined(PROFILING) stopTimer(); #endif @@ -1886,7 +1899,7 @@ delete_threads_and_gc: // the GC might have taken long enough for the timer to set // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE, // but we aren't necessarily deadlocked: - recent_activity = ACTIVITY_YES; + SEQ_CST_STORE(&recent_activity, ACTIVITY_YES); break; case ACTIVITY_DONE_GC: @@ -1936,7 +1949,7 @@ delete_threads_and_gc: releaseGCThreads(cap, idle_cap); } #endif - if (heap_overflow && sched_state == SCHED_RUNNING) { + if (heap_overflow && RELAXED_LOAD(&sched_state) == SCHED_RUNNING) { // GC set the heap_overflow flag. We should throw an exception if we // can, or shut down otherwise. @@ -1948,7 +1961,7 @@ delete_threads_and_gc: // shutdown now. Ultimately we want the main thread to return to // its caller with HeapExhausted, at which point the caller should // call hs_exit(). The first step is to delete all the threads. - sched_state = SCHED_INTERRUPTING; + RELAXED_STORE(&sched_state, SCHED_INTERRUPTING); goto delete_threads_and_gc; } @@ -2027,12 +2040,14 @@ forkProcess(HsStablePtr *entry ACQUIRE_LOCK(&sm_mutex); ACQUIRE_LOCK(&stable_ptr_mutex); ACQUIRE_LOCK(&stable_name_mutex); - ACQUIRE_LOCK(&task->lock); for (i=0; i < n_capabilities; i++) { ACQUIRE_LOCK(&capabilities[i]->lock); } + // Take task lock after capability lock to avoid order inversion (#17275). + ACQUIRE_LOCK(&task->lock); + #if defined(THREADED_RTS) ACQUIRE_LOCK(&all_tasks_mutex); #endif @@ -2634,8 +2649,9 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap) #if defined(THREADED_RTS) void scheduleWorker (Capability *cap, Task *task) { - // schedule() runs without a lock. + ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task); cap = schedule(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task); // On exit from schedule(), we have a Capability, but possibly not // the same one we started with. @@ -2695,7 +2711,7 @@ initScheduler(void) #endif sched_state = SCHED_RUNNING; - recent_activity = ACTIVITY_YES; + SEQ_CST_STORE(&recent_activity, ACTIVITY_YES); /* Initialise the mutex and condition variables used by @@ -2737,8 +2753,8 @@ exitScheduler (bool wait_foreign USED_IF_THREADS) Task *task = newBoundTask(); // If we haven't killed all the threads yet, do it now. - if (sched_state < SCHED_SHUTTING_DOWN) { - sched_state = SCHED_INTERRUPTING; + if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN) { + RELAXED_STORE(&sched_state, SCHED_INTERRUPTING); nonmovingStop(); Capability *cap = task->cap; waitForCapability(&cap,task); |