diff options
57 files changed, 1048 insertions, 735 deletions
diff --git a/.gitlab-ci.yml b/.gitlab-ci.yml index 796145dc45..fe96216272 100644 --- a/.gitlab-ci.yml +++ b/.gitlab-ci.yml @@ -681,6 +681,25 @@ nightly-x86_64-linux-deb9-integer-simple: TEST_ENV: "x86_64-linux-deb9-integer-simple" TEST_TYPE: slowtest +.build-x86_64-linux-deb9-tsan: + extends: .validate-linux-hadrian + stage: full-build + variables: + TEST_ENV: "x86_64-linux-deb9-tsan" + BUILD_FLAVOUR: "thread-sanitizer" + TSAN_OPTIONS: "suppressions=$CI_PROJECT_DIR/rts/.tsan-suppressions" + # Haddock is large enough to make TSAN choke without massive quantities of + # memory. + HADRIAN_ARGS: "--docs=none" + +nightly-x86_64-linux-deb9-tsan: + <<: *nightly + extends: .build-x86_64-linux-deb9-tsan + +validate-x86_64-linux-deb9-tsan: + extends: .build-x86_64-linux-deb9-tsan + when: manual + validate-x86_64-linux-deb9-dwarf: extends: .build-x86_64-linux-deb9 stage: full-build diff --git a/hadrian/src/Flavour.hs b/hadrian/src/Flavour.hs index 0a4439827f..d2adbe356e 100644 --- a/hadrian/src/Flavour.hs +++ b/hadrian/src/Flavour.hs @@ -124,4 +124,5 @@ enableThreadSanitizer = addArgs $ mconcat , builder (Ghc LinkHs) ? arg "-optl-fsanitize=thread" , builder (Cc CompileC) ? (arg "-fsanitize=thread" <> arg "-DTSAN_ENABLED") , builder (Cabal Flags) ? arg "thread-sanitizer" + , builder RunTest ? arg "--config=have_thread_sanitizer=True" ] diff --git a/includes/rts/OSThreads.h b/includes/rts/OSThreads.h index a68f1ea140..21b92950b2 100644 --- a/includes/rts/OSThreads.h +++ b/includes/rts/OSThreads.h @@ -164,7 +164,8 @@ typedef void* OSThreadProcAttr OSThreadProc(void *); extern int createOSThread ( OSThreadId* tid, char *name, OSThreadProc *startProc, void *param); extern bool osThreadIsAlive ( OSThreadId id ); -extern void interruptOSThread (OSThreadId id); +extern void interruptOSThread ( OSThreadId id ); +extern void joinOSThread ( OSThreadId id ); // // Condition Variables diff --git a/includes/rts/SpinLock.h b/includes/rts/SpinLock.h index 0ac51455dd..c1fe6c866c 100644 --- a/includes/rts/SpinLock.h +++ b/includes/rts/SpinLock.h @@ -39,19 +39,14 @@ typedef struct SpinLock_ #define IF_PROF_SPIN(x) #endif +void acquire_spin_lock_slow_path(SpinLock * p); + // acquire spin lock INLINE_HEADER void ACQUIRE_SPIN_LOCK(SpinLock * p) { - do { - for (uint32_t i = 0; i < SPIN_COUNT; i++) { - StgWord32 r = cas((StgVolatilePtr)&(p->lock), 1, 0); - if (r != 0) return; - IF_PROF_SPIN(__atomic_fetch_add(&p->spin, 1, __ATOMIC_RELAXED)); - busy_wait_nop(); - } - IF_PROF_SPIN(__atomic_fetch_add(&p->yield, 1, __ATOMIC_RELAXED)); - yieldThread(); - } while (1); + StgWord32 r = cas((StgVolatilePtr)&(p->lock), 1, 0); + if (RTS_UNLIKELY(r == 0)) + acquire_spin_lock_slow_path(p); } // release spin lock diff --git a/includes/rts/StablePtr.h b/includes/rts/StablePtr.h index f42c353d2b..56113b9f81 100644 --- a/includes/rts/StablePtr.h +++ b/includes/rts/StablePtr.h @@ -31,5 +31,9 @@ extern DLL_IMPORT_RTS spEntry *stable_ptr_table; EXTERN_INLINE StgPtr deRefStablePtr(StgStablePtr sp) { - return stable_ptr_table[(StgWord)sp].addr; + // acquire load to ensure that we see the new SPT if it has been recently + // enlarged. + const spEntry *spt = ACQUIRE_LOAD(&stable_ptr_table); + // acquire load to ensure that the referenced object is visible. + return ACQUIRE_LOAD(&spt[(StgWord)sp].addr); } diff --git a/includes/rts/TSANUtils.h b/includes/rts/TSANUtils.h index 00f226d9c6..72f4541a89 100644 --- a/includes/rts/TSANUtils.h +++ b/includes/rts/TSANUtils.h @@ -40,6 +40,10 @@ #endif #if defined(TSAN_ENABLED) +#if !defined(HAVE_C11_ATOMICS) +#error TSAN cannot be enabled without C11 atomics suppoort. +#endif + #define TSAN_ANNOTATE_HAPPENS_BEFORE(addr) \ AnnotateHappensBefore(__FILE__, __LINE__, (void*)(addr)) #define TSAN_ANNOTATE_HAPPENS_AFTER(addr) \ diff --git a/includes/rts/storage/Closures.h b/includes/rts/storage/Closures.h index 3196efd3de..981e162ec1 100644 --- a/includes/rts/storage/Closures.h +++ b/includes/rts/storage/Closures.h @@ -340,9 +340,9 @@ typedef struct StgTVarWatchQueue_ { typedef struct { StgHeader header; - StgClosure *volatile current_value; - StgTVarWatchQueue *volatile first_watch_queue_entry; - StgInt volatile num_updates; + StgClosure *current_value; /* accessed via atomics */ + StgTVarWatchQueue *first_watch_queue_entry; /* accessed via atomics */ + StgInt num_updates; /* accessed via atomics */ } StgTVar; /* new_value == expected_value for read-only accesses */ diff --git a/includes/rts/storage/GC.h b/includes/rts/storage/GC.h index 9f4a0dde07..e8dc05048a 100644 --- a/includes/rts/storage/GC.h +++ b/includes/rts/storage/GC.h @@ -247,9 +247,9 @@ extern bool keepCAFs; INLINE_HEADER void initBdescr(bdescr *bd, generation *gen, generation *dest) { - bd->gen = gen; - bd->gen_no = gen->no; - bd->dest_no = dest->no; + RELAXED_STORE(&bd->gen, gen); + RELAXED_STORE(&bd->gen_no, gen->no); + RELAXED_STORE(&bd->dest_no, dest->no); #if !IN_STG_CODE /* See Note [RtsFlags is a pointer in STG code] */ diff --git a/includes/stg/SMP.h b/includes/stg/SMP.h index fa52a913c4..389dd95c88 100644 --- a/includes/stg/SMP.h +++ b/includes/stg/SMP.h @@ -440,6 +440,7 @@ load_load_barrier(void) { // Relaxed atomic operations. #define RELAXED_LOAD(ptr) __atomic_load_n(ptr, __ATOMIC_RELAXED) #define RELAXED_STORE(ptr,val) __atomic_store_n(ptr, val, __ATOMIC_RELAXED) +#define RELAXED_ADD(ptr,val) __atomic_add_fetch(ptr, val, __ATOMIC_RELAXED) // Acquire/release atomic operations #define ACQUIRE_LOAD(ptr) __atomic_load_n(ptr, __ATOMIC_ACQUIRE) @@ -453,6 +454,14 @@ load_load_barrier(void) { // Non-atomic addition for "approximate" counters that can be lossy #define NONATOMIC_ADD(ptr,val) RELAXED_STORE(ptr, RELAXED_LOAD(ptr) + val) +// Explicit fences +// +// These are typically necessary only in very specific cases (e.g. WSDeque) +// where the ordered operations aren't expressive enough to capture the desired +// ordering. +#define RELEASE_FENCE() __atomic_thread_fence(__ATOMIC_RELEASE) +#define SEQ_CST_FENCE() __atomic_thread_fence(__ATOMIC_SEQ_CST) + /* ---------------------------------------------------------------------- */ #else /* !THREADED_RTS */ @@ -466,6 +475,7 @@ EXTERN_INLINE void load_load_barrier () {} /* nothing */ // Relaxed atomic operations #define RELAXED_LOAD(ptr) *ptr #define RELAXED_STORE(ptr,val) *ptr = val +#define RELAXED_ADD(ptr,val) *ptr += val // Acquire/release atomic operations #define ACQUIRE_LOAD(ptr) *ptr @@ -479,6 +489,10 @@ EXTERN_INLINE void load_load_barrier () {} /* nothing */ // Non-atomic addition for "approximate" counters that can be lossy #define NONATOMIC_ADD(ptr,val) *ptr += val +// Fences +#define RELEASE_FENCE() +#define SEQ_CST_FENCE() + #if !IN_STG_CODE || IN_STGCRUN INLINE_HEADER StgWord xchg(StgPtr p, StgWord w) @@ -527,6 +541,9 @@ atomic_dec(StgVolatilePtr p) } #endif +/* An alias for the C11 declspec */ +#define ATOMIC + #define VOLATILE_LOAD(p) ((StgWord)*((StgWord*)(p))) #endif /* !THREADED_RTS */ diff --git a/libraries/base/GHC/Event/Control.hs b/libraries/base/GHC/Event/Control.hs index a9f23e07d2..9054da4f22 100644 --- a/libraries/base/GHC/Event/Control.hs +++ b/libraries/base/GHC/Event/Control.hs @@ -123,6 +123,10 @@ newControl shouldRegister = allocaArray 2 $ \fds -> do -- the RTS, then *BEFORE* the wakeup file is closed, we must call -- c_setIOManagerWakeupFd (-1), so that the RTS does not try to use the wakeup -- file after it has been closed. +-- +-- Note, however, that even if we do the above, this function is still racy +-- since we do not synchronize between here and ioManagerWakeup. +-- ioManagerWakeup ignores failures that arise from this case. closeControl :: Control -> IO () closeControl w = do _ <- atomicSwapIORef (controlIsDead w) True diff --git a/rts/.tsan-suppressions b/rts/.tsan-suppressions index b990c6cfc1..734faff5a6 100644 --- a/rts/.tsan-suppressions +++ b/rts/.tsan-suppressions @@ -7,3 +7,8 @@ race:capability_is_busy # This is a benign race during IO manager shutdown (between ioManagerWakeup # and GHC.Event.Control.closeControl). race:ioManagerWakeup +race:base_GHCziEventziControl_zdwcloseControl_info + +# This is a potentially problematic race which I have yet to work out +# (#17289) +race:handle_tick diff --git a/rts/Capability.c b/rts/Capability.c index 181075b963..77ca5e8c0d 100644 --- a/rts/Capability.c +++ b/rts/Capability.c @@ -84,8 +84,8 @@ Capability * rts_unsafeGetMyCapability (void) STATIC_INLINE bool globalWorkToDo (void) { - return sched_state >= SCHED_INTERRUPTING - || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock + return RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING + || RELAXED_LOAD(&recent_activity) == ACTIVITY_INACTIVE; // need to check for deadlock } #endif @@ -211,7 +211,10 @@ newReturningTask (Capability *cap, Task *task) cap->returning_tasks_hd = task; } cap->returning_tasks_tl = task; - cap->n_returning_tasks++; + + // See Note [Data race in shouldYieldCapability] in Schedule.c. + RELAXED_ADD(&cap->n_returning_tasks, 1); + ASSERT_RETURNING_TASKS(cap,task); } @@ -227,7 +230,10 @@ popReturningTask (Capability *cap) cap->returning_tasks_tl = NULL; } task->next = NULL; - cap->n_returning_tasks--; + + // See Note [Data race in shouldYieldCapability] in Schedule.c. + RELAXED_ADD(&cap->n_returning_tasks, -1); + ASSERT_RETURNING_TASKS(cap,task); return task; } @@ -410,36 +416,44 @@ void moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS) { #if defined(THREADED_RTS) - uint32_t i; - Capability **old_capabilities = capabilities; + Capability **new_capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities"); - capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities"); + // We must disable the timer while we do this since the tick handler may + // call contextSwitchAllCapabilities, which may see the capabilities array + // as we free it. The alternative would be to protect the capabilities + // array with a lock but this seems more expensive than necessary. + // See #17289. + stopTimer(); if (to == 1) { // THREADED_RTS must work on builds that don't have a mutable // BaseReg (eg. unregisterised), so in this case // capabilities[0] must coincide with &MainCapability. - capabilities[0] = &MainCapability; + new_capabilities[0] = &MainCapability; initCapability(&MainCapability, 0); } else { - for (i = 0; i < to; i++) { + for (uint32_t i = 0; i < to; i++) { if (i < from) { - capabilities[i] = old_capabilities[i]; + new_capabilities[i] = capabilities[i]; } else { - capabilities[i] = stgMallocBytes(sizeof(Capability), - "moreCapabilities"); - initCapability(capabilities[i], i); + new_capabilities[i] = stgMallocBytes(sizeof(Capability), + "moreCapabilities"); + initCapability(new_capabilities[i], i); } } } debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from); + Capability **old_capabilities = ACQUIRE_LOAD(&capabilities); + RELEASE_STORE(&capabilities, new_capabilities); if (old_capabilities != NULL) { stgFree(old_capabilities); } + + startTimer(); #endif } @@ -505,6 +519,9 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task) * The current Task (cap->task) releases the Capability. The Capability is * marked free, and if there is any work to do, an appropriate Task is woken up. * + * The caller must hold cap->lock and will still hold it after + * releaseCapability returns. + * * N.B. May need to take all_tasks_mutex. * * ------------------------------------------------------------------------- */ @@ -520,8 +537,9 @@ releaseCapability_ (Capability* cap, ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task); ASSERT_RETURNING_TASKS(cap,task); + ASSERT_LOCK_HELD(&cap->lock); - cap->running_task = NULL; + RELAXED_STORE(&cap->running_task, NULL); // Check to see whether a worker thread can be given // the go-ahead to return the result of an external call.. @@ -540,7 +558,7 @@ releaseCapability_ (Capability* cap, // be currently in waitForCapability() waiting for this // capability, in which case simply setting it as free would not // wake up the waiting task. - PendingSync *sync = pending_sync; + PendingSync *sync = SEQ_CST_LOAD(&pending_sync); if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) { debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no); return; @@ -564,7 +582,7 @@ releaseCapability_ (Capability* cap, // is interrupted, we only create a worker task if there // are threads that need to be completed. If the system is // shutting down, we never create a new worker. - if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { + if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) { debugTrace(DEBUG_sched, "starting new worker on capability %d", cap->no); startWorkerTask(cap); @@ -587,7 +605,7 @@ releaseCapability_ (Capability* cap, #if defined(PROFILING) cap->r.rCCCS = CCS_IDLE; #endif - last_free_capability[cap->node] = cap; + RELAXED_STORE(&last_free_capability[cap->node], cap); debugTrace(DEBUG_sched, "freeing capability %d", cap->no); } @@ -639,6 +657,36 @@ enqueueWorker (Capability* cap USED_IF_THREADS) #endif +/* + * Note [Benign data race due to work-pushing] + * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + * + * #17276 points out a tricky data race (noticed by ThreadSanitizer) between + * waitForWorkerCapability and schedulePushWork. In short, schedulePushWork + * works as follows: + * + * 1. collect the set of all idle capabilities, take cap->lock of each. + * + * 2. sort through each TSO on the calling capability's run queue and push + * some to idle capabilities. This may (if the TSO is a bound thread) + * involve setting tso->bound->task->cap despite not holding + * tso->bound->task->lock. + * + * 3. release cap->lock of all idle capabilities. + * + * Now, step 2 is in principle safe since the capability of the caller of + * schedulePushWork *owns* the TSO and therefore the Task to which it is bound. + * Furthermore, step 3 ensures that the write in step (2) will be visible to + * any core which starts execution of the previously-idle capability. + * + * However, this argument doesn't quite work for waitForWorkerCapability, which + * reads task->cap *without* first owning the capability which owns `task`. + * For this reason, we check again whether the task has been migrated to + * another capability after taking task->cap->lock. See Note [migrated bound + * threads] above. + * + */ + /* ---------------------------------------------------------------------------- * waitForWorkerCapability(task) * @@ -656,7 +704,13 @@ static Capability * waitForWorkerCapability (Task *task) ACQUIRE_LOCK(&task->lock); // task->lock held, cap->lock not held if (!task->wakeup) waitCondition(&task->cond, &task->lock); + // The happens-after matches the happens-before in + // schedulePushWork, which does owns 'task' when it sets 'task->cap'. + TSAN_ANNOTATE_HAPPENS_AFTER(&task->cap); cap = task->cap; + + // See Note [Benign data race due to work-pushing]. + TSAN_ANNOTATE_BENIGN_RACE(&task->cap, "we will double-check this below"); task->wakeup = false; RELEASE_LOCK(&task->lock); @@ -692,7 +746,7 @@ static Capability * waitForWorkerCapability (Task *task) cap->n_spare_workers--; } - cap->running_task = task; + RELAXED_STORE(&cap->running_task, task); RELEASE_LOCK(&cap->lock); break; } @@ -733,7 +787,7 @@ static Capability * waitForReturnCapability (Task *task) RELEASE_LOCK(&cap->lock); continue; } - cap->running_task = task; + RELAXED_STORE(&cap->running_task, task); popReturningTask(cap); RELEASE_LOCK(&cap->lock); break; @@ -746,6 +800,65 @@ static Capability * waitForReturnCapability (Task *task) #endif /* THREADED_RTS */ +#if defined(THREADED_RTS) + +/* ---------------------------------------------------------------------------- + * capability_is_busy (Capability *cap) + * + * A predicate for determining whether the given Capability is currently running + * a Task. This can be safely called without holding the Capability's lock + * although the result may be inaccurate if it races with the scheduler. + * Consequently there is a TSAN suppression for it. + * + * ------------------------------------------------------------------------- */ +static bool capability_is_busy(const Capability * cap) +{ + return RELAXED_LOAD(&cap->running_task) != NULL; +} + + +/* ---------------------------------------------------------------------------- + * find_capability_for_task + * + * Given a Task, identify a reasonable Capability to run it on. We try to + * find an idle capability if possible. + * + * ------------------------------------------------------------------------- */ + +static Capability * find_capability_for_task(const Task * task) +{ + if (task->preferred_capability != -1) { + // Does the task have a preferred capability? If so, use it + return capabilities[task->preferred_capability % + enabled_capabilities]; + } else { + // Try last_free_capability first + Capability *cap = RELAXED_LOAD(&last_free_capability[task->node]); + + // N.B. There is a data race here since we are loking at + // cap->running_task without taking cap->lock. However, this is + // benign since the result is merely guiding our search heuristic. + if (!capability_is_busy(cap)) { + return cap; + } else { + // The last_free_capability is already busy, search for a free + // capability on this node. + for (uint32_t i = task->node; i < enabled_capabilities; + i += n_numa_nodes) { + // visits all the capabilities on this node, because + // cap[i]->node == i % n_numa_nodes + if (!RELAXED_LOAD(&capabilities[i]->running_task)) { + return capabilities[i]; + } + } + + // Can't find a free one, use last_free_capability. + return RELAXED_LOAD(&last_free_capability[task->node]); + } + } +} +#endif /* THREADED_RTS */ + /* ---------------------------------------------------------------------------- * waitForCapability (Capability **pCap, Task *task) * @@ -768,38 +881,13 @@ void waitForCapability (Capability **pCap, Task *task) *pCap = &MainCapability; #else - uint32_t i; Capability *cap = *pCap; if (cap == NULL) { - if (task->preferred_capability != -1) { - cap = capabilities[task->preferred_capability % - enabled_capabilities]; - } else { - // Try last_free_capability first - cap = last_free_capability[task->node]; - if (cap->running_task) { - // Otherwise, search for a free capability on this node. - cap = NULL; - for (i = task->node; i < enabled_capabilities; - i += n_numa_nodes) { - // visits all the capabilities on this node, because - // cap[i]->node == i % n_numa_nodes - if (!capabilities[i]->running_task) { - cap = capabilities[i]; - break; - } - } - if (cap == NULL) { - // Can't find a free one, use last_free_capability. - cap = last_free_capability[task->node]; - } - } - } + cap = find_capability_for_task(task); // record the Capability as the one this Task is now associated with. task->cap = cap; - } else { ASSERT(task->cap == cap); } @@ -809,7 +897,7 @@ void waitForCapability (Capability **pCap, Task *task) ACQUIRE_LOCK(&cap->lock); if (!cap->running_task) { // It's free; just grab it - cap->running_task = task; + RELAXED_STORE(&cap->running_task, task); RELEASE_LOCK(&cap->lock); } else { newReturningTask(cap,task); @@ -865,7 +953,7 @@ yieldCapability (Capability** pCap, Task *task, bool gcAllowed) if (gcAllowed) { - PendingSync *sync = pending_sync; + PendingSync *sync = SEQ_CST_LOAD(&pending_sync); if (sync) { switch (sync->type) { @@ -936,33 +1024,41 @@ yieldCapability (Capability** pCap, Task *task, bool gcAllowed) #endif /* THREADED_RTS */ -// Note [migrated bound threads] -// -// There's a tricky case where: -// - cap A is running an unbound thread T1 -// - there is a bound thread T2 at the head of the run queue on cap A -// - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A -// - T1 returns quickly grabbing A again (T2 is still waking up on A) -// - T1 blocks, the scheduler migrates T2 to cap B -// - the task bound to T2 wakes up on cap B -// -// We take advantage of the following invariant: -// -// - A bound thread can only be migrated by the holder of the -// Capability on which the bound thread currently lives. So, if we -// hold Capability C, and task->cap == C, then task cannot be -// migrated under our feet. - -// Note [migrated bound threads 2] -// -// Second tricky case; -// - A bound Task becomes a GC thread -// - scheduleDoGC() migrates the thread belonging to this Task, -// because the Capability it is on is disabled -// - after GC, gcWorkerThread() returns, but now we are -// holding a Capability that is not the same as task->cap -// - Hence we must check for this case and immediately give up the -// cap we hold. +/* + * Note [migrated bound threads] + * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + * + * There's a tricky case where: + * - cap A is running an unbound thread T1 + * - there is a bound thread T2 at the head of the run queue on cap A + * - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A + * - T1 returns quickly grabbing A again (T2 is still waking up on A) + * - T1 blocks, the scheduler migrates T2 to cap B + * - the task bound to T2 wakes up on cap B + * + * We take advantage of the following invariant: + * + * - A bound thread can only be migrated by the holder of the + * Capability on which the bound thread currently lives. So, if we + * hold Capability C, and task->cap == C, then task cannot be + * migrated under our feet. + * + * See also Note [Benign data race due to work-pushing]. + * + * + * Note [migrated bound threads 2] + * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + * + * Second tricky case; + * - A bound Task becomes a GC thread + * - scheduleDoGC() migrates the thread belonging to this Task, + * because the Capability it is on is disabled + * - after GC, gcWorkerThread() returns, but now we are + * holding a Capability that is not the same as task->cap + * - Hence we must check for this case and immediately give up the + * cap we hold. + * + */ /* ---------------------------------------------------------------------------- * prodCapability @@ -999,7 +1095,10 @@ bool tryGrabCapability (Capability *cap, Task *task) { int r; - if (cap->running_task != NULL) return false; + // N.B. This is benign as we will check again after taking the lock. + TSAN_ANNOTATE_BENIGN_RACE(&cap->running_task, "tryGrabCapability (cap->running_task)"); + if (RELAXED_LOAD(&cap->running_task) != NULL) return false; + r = TRY_ACQUIRE_LOCK(&cap->lock); if (r != 0) return false; if (cap->running_task != NULL) { @@ -1007,7 +1106,7 @@ tryGrabCapability (Capability *cap, Task *task) return false; } task->cap = cap; - cap->running_task = task; + RELAXED_STORE(&cap->running_task, task); RELEASE_LOCK(&cap->lock); return true; } @@ -1256,7 +1355,7 @@ void setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) { #if defined(THREADED_RTS) if (cap_no < n_capabilities) { - capabilities[cap_no]->io_manager_control_wr_fd = fd; + RELAXED_STORE(&capabilities[cap_no]->io_manager_control_wr_fd, fd); } else { errorBelch("warning: setIOManagerControlFd called with illegal capability number."); } diff --git a/rts/Capability.h b/rts/Capability.h index 87ec53801a..8c5b1e814e 100644 --- a/rts/Capability.h +++ b/rts/Capability.h @@ -182,10 +182,10 @@ struct Capability_ { #endif // These properties should be true when a Task is holding a Capability -#define ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task) \ - ASSERT(cap->running_task != NULL && cap->running_task == task); \ - ASSERT(task->cap == cap); \ - ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task) +#define ASSERT_FULL_CAPABILITY_INVARIANTS(_cap,_task) \ + ASSERT(_cap->running_task != NULL && _cap->running_task == _task); \ + ASSERT(_task->cap == _cap); \ + ASSERT_PARTIAL_CAPABILITY_INVARIANTS(_cap,_task) // This assert requires cap->lock to be held, so it can't be part of // ASSERT_PARTIAL_CAPABILITY_INVARIANTS() @@ -419,14 +419,16 @@ recordMutableCap (const StgClosure *p, Capability *cap, uint32_t gen) // ASSERT(cap->running_task == myTask()); // NO: assertion is violated by performPendingThrowTos() bd = cap->mut_lists[gen]; - if (bd->free >= bd->start + BLOCK_SIZE_W) { + if (RELAXED_LOAD(&bd->free) >= bd->start + BLOCK_SIZE_W) { bdescr *new_bd; new_bd = allocBlockOnNode_lock(cap->node); new_bd->link = bd; + new_bd->free = new_bd->start; bd = new_bd; cap->mut_lists[gen] = bd; } - *bd->free++ = (StgWord)p; + RELAXED_STORE(bd->free, (StgWord) p); + NONATOMIC_ADD(&bd->free, 1); } EXTERN_INLINE void @@ -460,29 +462,33 @@ stopCapability (Capability *cap) // It may not work - the thread might be updating HpLim itself // at the same time - so we also have the context_switch/interrupted // flags as a sticky way to tell the thread to stop. - cap->r.rHpLim = NULL; + TSAN_ANNOTATE_BENIGN_RACE(&cap->r.rHpLim, "stopCapability"); + SEQ_CST_STORE(&cap->r.rHpLim, NULL); } INLINE_HEADER void interruptCapability (Capability *cap) { stopCapability(cap); - cap->interrupt = 1; + SEQ_CST_STORE(&cap->interrupt, true); } INLINE_HEADER void contextSwitchCapability (Capability *cap) { stopCapability(cap); - cap->context_switch = 1; + SEQ_CST_STORE(&cap->context_switch, true); } #if defined(THREADED_RTS) INLINE_HEADER bool emptyInbox(Capability *cap) { - return (cap->inbox == (Message*)END_TSO_QUEUE && - cap->putMVars == NULL); + // This may race with writes to putMVars and inbox but this harmless for the + // intended uses of this function. + TSAN_ANNOTATE_BENIGN_RACE(&cap->putMVars, "emptyInbox(cap->putMVars)"); + return (RELAXED_LOAD(&cap->inbox) == (Message*)END_TSO_QUEUE && + RELAXED_LOAD(&cap->putMVars) == NULL); } #endif diff --git a/rts/Messages.c b/rts/Messages.c index 2f80370845..285ca5be63 100644 --- a/rts/Messages.c +++ b/rts/Messages.c @@ -39,7 +39,7 @@ void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg) #endif msg->link = to_cap->inbox; - to_cap->inbox = msg; + RELAXED_STORE(&to_cap->inbox, msg); recordClosureMutated(from_cap,(StgClosure*)msg); @@ -68,8 +68,7 @@ executeMessage (Capability *cap, Message *m) const StgInfoTable *i; loop: - write_barrier(); // allow m->header to be modified by another thread - i = m->header.info; + i = ACQUIRE_LOAD(&m->header.info); if (i == &stg_MSG_TRY_WAKEUP_info) { StgTSO *tso = ((MessageWakeup *)m)->tso; @@ -127,7 +126,7 @@ loop: else if (i == &stg_WHITEHOLE_info) { #if defined(PROF_SPIN) - ++whitehole_executeMessage_spin; + NONATOMIC_ADD(&whitehole_executeMessage_spin, 1); #endif goto loop; } @@ -169,8 +168,7 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg) debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on " "blackhole %p", (W_)msg->tso->id, msg->bh); - info = bh->header.info; - load_load_barrier(); // See Note [Heap memory barriers] in SMP.h + info = ACQUIRE_LOAD(&bh->header.info); // If we got this message in our inbox, it might be that the // BLACKHOLE has already been updated, and GC has shorted out the @@ -190,11 +188,11 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg) // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND, // or a value. loop: - // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load - // and turns this into an infinite loop. - p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee)); - info = p->header.info; - load_load_barrier(); // See Note [Heap memory barriers] in SMP.h + // If we are being called from stg_BLACKHOLE then TSAN won't know about the + // previous read barrier that makes the following access safe. + TSAN_ANNOTATE_BENIGN_RACE(&((StgInd*)bh)->indirectee, "messageBlackHole"); + p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee)); + info = RELAXED_LOAD(&p->header.info); if (info == &stg_IND_info) { @@ -240,9 +238,8 @@ loop: // We are about to make the newly-constructed message visible to other cores; // a barrier is necessary to ensure that all writes are visible. // See Note [Heap memory barriers] in SMP.h. - write_barrier(); dirty_TSO(cap, owner); // we will modify owner->bq - owner->bq = bq; + RELEASE_STORE(&owner->bq, bq); // If the owner of the blackhole is currently runnable, then // bump it to the front of the run queue. This gives the @@ -258,11 +255,11 @@ loop: } // point to the BLOCKING_QUEUE from the BLACKHOLE - write_barrier(); // make the BQ visible, see Note [Heap memory barriers]. + // RELEASE to make the BQ visible, see Note [Heap memory barriers]. + RELEASE_STORE(&((StgInd*)bh)->indirectee, (StgClosure *)bq); IF_NONMOVING_WRITE_BARRIER_ENABLED { updateRemembSetPushClosure(cap, (StgClosure*)p); } - ((StgInd*)bh)->indirectee = (StgClosure *)bq; recordClosureMutated(cap,bh); // bh was mutated debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d", @@ -295,14 +292,14 @@ loop: // makes it into the update remembered set updateRemembSetPushClosure(cap, (StgClosure*)bq->queue); } - msg->link = bq->queue; + RELAXED_STORE(&msg->link, bq->queue); bq->queue = msg; // No barrier is necessary here: we are only exposing the // closure to the GC. See Note [Heap memory barriers] in SMP.h. recordClosureMutated(cap,(StgClosure*)msg); if (info == &stg_BLOCKING_QUEUE_CLEAN_info) { - bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + RELAXED_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info); // No barrier is necessary here: we are only exposing the // closure to the GC. See Note [Heap memory barriers] in SMP.h. recordClosureMutated(cap,(StgClosure*)bq); @@ -333,7 +330,7 @@ StgTSO * blackHoleOwner (StgClosure *bh) const StgInfoTable *info; StgClosure *p; - info = bh->header.info; + info = RELAXED_LOAD(&bh->header.info); if (info != &stg_BLACKHOLE_info && info != &stg_CAF_BLACKHOLE_info && @@ -345,10 +342,8 @@ StgTSO * blackHoleOwner (StgClosure *bh) // The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND, // or a value. loop: - // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load - // and turns this into an infinite loop. - p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee)); - info = p->header.info; + p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee)); + info = RELAXED_LOAD(&p->header.info); if (info == &stg_IND_info) goto loop; @@ -360,7 +355,7 @@ loop: info == &stg_BLOCKING_QUEUE_DIRTY_info) { StgBlockingQueue *bq = (StgBlockingQueue *)p; - return bq->owner; + return RELAXED_LOAD(&bq->owner); } return NULL; // not blocked diff --git a/rts/Proftimer.c b/rts/Proftimer.c index 68a73a5446..24f82ead6d 100644 --- a/rts/Proftimer.c +++ b/rts/Proftimer.c @@ -30,7 +30,7 @@ void stopProfTimer( void ) { #if defined(PROFILING) - do_prof_ticks = false; + RELAXED_STORE(&do_prof_ticks, false); #endif } @@ -38,14 +38,14 @@ void startProfTimer( void ) { #if defined(PROFILING) - do_prof_ticks = true; + RELAXED_STORE(&do_prof_ticks, true); #endif } void stopHeapProfTimer( void ) { - do_heap_prof_ticks = false; + RELAXED_STORE(&do_heap_prof_ticks, false); } void @@ -74,7 +74,7 @@ handleProfTick(void) { #if defined(PROFILING) total_ticks++; - if (do_prof_ticks) { + if (RELAXED_LOAD(&do_prof_ticks)) { uint32_t n; for (n=0; n < n_capabilities; n++) { capabilities[n]->r.rCCCS->time_ticks++; @@ -83,7 +83,7 @@ handleProfTick(void) } #endif - if (do_heap_prof_ticks) { + if (RELAXED_LOAD(&do_heap_prof_ticks)) { ticks_to_heap_profile--; if (ticks_to_heap_profile <= 0) { ticks_to_heap_profile = RtsFlags.ProfFlags.heapProfileIntervalTicks; diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c index 719c05435d..a3593fe7a6 100644 --- a/rts/RaiseAsync.c +++ b/rts/RaiseAsync.c @@ -232,7 +232,7 @@ uint32_t throwToMsg (Capability *cap, MessageThrowTo *msg) { StgWord status; - StgTSO *target = msg->target; + StgTSO *target = ACQUIRE_LOAD(&msg->target); Capability *target_cap; goto check_target; @@ -245,8 +245,9 @@ check_target: ASSERT(target != END_TSO_QUEUE); // Thread already dead? - if (target->what_next == ThreadComplete - || target->what_next == ThreadKilled) { + StgWord16 what_next = SEQ_CST_LOAD(&target->what_next); + if (what_next == ThreadComplete + || what_next == ThreadKilled) { return THROWTO_SUCCESS; } @@ -988,7 +989,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception, sp[0] = (W_)raise; sp[-1] = (W_)&stg_enter_info; stack->sp = sp-1; - tso->what_next = ThreadRunGHC; + RELAXED_STORE(&tso->what_next, ThreadRunGHC); goto done; } diff --git a/rts/RtsStartup.c b/rts/RtsStartup.c index a20dc3d04a..5e2495844c 100644 --- a/rts/RtsStartup.c +++ b/rts/RtsStartup.c @@ -285,6 +285,13 @@ hs_init_ghc(int *argc, char **argv[], RtsConfig rts_config) /* Initialise libdw session pool */ libdwPoolInit(); + /* Start the "ticker" and profiling timer but don't start until the + * scheduler is up. However, the ticker itself needs to be initialized + * before the scheduler to ensure that the ticker mutex is initialized as + * moreCapabilities will attempt to acquire it. + */ + initTimer(); + /* initialise scheduler data structures (needs to be done before * initStorage()). */ @@ -366,7 +373,6 @@ hs_init_ghc(int *argc, char **argv[], RtsConfig rts_config) initHeapProfiling(); /* start the virtual timer 'subsystem'. */ - initTimer(); startTimer(); #if defined(RTS_USER_SIGNALS) diff --git a/rts/SMPClosureOps.h b/rts/SMPClosureOps.h index c73821a782..2df88db06c 100644 --- a/rts/SMPClosureOps.h +++ b/rts/SMPClosureOps.h @@ -62,12 +62,12 @@ EXTERN_INLINE StgInfoTable *reallyLockClosure(StgClosure *p) info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info); if (info != (W_)&stg_WHITEHOLE_info) return (StgInfoTable *)info; #if defined(PROF_SPIN) - ++whitehole_lockClosure_spin; + NONATOMIC_ADD(&whitehole_lockClosure_spin, 1); #endif busy_wait_nop(); } while (++i < SPIN_COUNT); #if defined(PROF_SPIN) - ++whitehole_lockClosure_yield; + NONATOMIC_ADD(&whitehole_lockClosure_yield, 1); #endif yieldThread(); } while (1); @@ -119,9 +119,8 @@ tryLockClosure(StgClosure *p) EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info) { - // This is a strictly ordered write, so we need a write_barrier(): - write_barrier(); - p->header.info = info; + // This is a strictly ordered write, so we need a RELEASE ordering. + RELEASE_STORE(&p->header.info, info); } #endif /* CMINUSMINUS */ @@ -187,7 +187,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED, StgTVar *s STG_UNUSED) { StgClosure *result; TRACE("%p : lock_tvar(%p)", trec, s); - result = s -> current_value; + result = SEQ_CST_LOAD(&s->current_value); return result; } @@ -198,8 +198,8 @@ static void unlock_tvar(Capability *cap, StgBool force_update) { TRACE("%p : unlock_tvar(%p)", trec, s); if (force_update) { - StgClosure *old_value = s -> current_value; - s -> current_value = c; + StgClosure *old_value = SEQ_CST_LOAD(&s->current_value); + RELEASE_STORE(&s->current_value, c); dirty_TVAR(cap, s, old_value); } } @@ -210,7 +210,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED, StgClosure *expected) { StgClosure *result; TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected); - result = s -> current_value; + result = SEQ_CST_LOAD(&s->current_value); TRACE("%p : %s", trec, (result == expected) ? "success" : "failure"); return (result == expected); } @@ -231,7 +231,7 @@ static void lock_stm(StgTRecHeader *trec) { static void unlock_stm(StgTRecHeader *trec STG_UNUSED) { TRACE("%p : unlock_stm()", trec); ASSERT(smp_locked == trec); - smp_locked = 0; + SEQ_CST_STORE(&smp_locked, 0); } static StgClosure *lock_tvar(Capability *cap STG_UNUSED, @@ -240,7 +240,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED, StgClosure *result; TRACE("%p : lock_tvar(%p)", trec, s); ASSERT(smp_locked == trec); - result = s -> current_value; + result = SEQ_CST_LOAD(&s->current_value); return result; } @@ -252,8 +252,8 @@ static void *unlock_tvar(Capability *cap, TRACE("%p : unlock_tvar(%p, %p)", trec, s, c); ASSERT(smp_locked == trec); if (force_update) { - StgClosure *old_value = s -> current_value; - s -> current_value = c; + StgClosure *old_value = SEQ_CST_LOAD(&s->current_value); + RELEASE_STORE(&s->current_value, c); dirty_TVAR(cap, s, old_value); } } @@ -265,7 +265,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED, StgClosure *result; TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected); ASSERT(smp_locked == trec); - result = s -> current_value; + result = SEQ_CST_LOAD(&s->current_value); TRACE("%p : %d", result ? "success" : "failure"); return (result == expected); } @@ -292,9 +292,9 @@ static StgClosure *lock_tvar(Capability *cap, TRACE("%p : lock_tvar(%p)", trec, s); do { do { - result = s -> current_value; + result = SEQ_CST_LOAD(&s->current_value); } while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info); - } while (cas((void *)&(s -> current_value), + } while (cas((void *) &s->current_value, (StgWord)result, (StgWord)trec) != (StgWord)result); @@ -311,8 +311,8 @@ static void unlock_tvar(Capability *cap, StgClosure *c, StgBool force_update STG_UNUSED) { TRACE("%p : unlock_tvar(%p, %p)", trec, s, c); - ASSERT(s -> current_value == (StgClosure *)trec); - s -> current_value = c; + ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec); + RELEASE_STORE(&s->current_value, c); dirty_TVAR(cap, s, (StgClosure *) trec); } @@ -375,7 +375,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) { StgTVarWatchQueue *trail; TRACE("unpark_waiters_on tvar=%p", s); // unblock TSOs in reverse order, to be a bit fairer (#2319) - for (q = s -> first_watch_queue_entry, trail = q; + for (q = SEQ_CST_LOAD(&s->first_watch_queue_entry), trail = q; q != END_STM_WATCH_QUEUE; q = q -> next_queue_entry) { trail = q; @@ -532,16 +532,16 @@ static void build_watch_queue_entries_for_trec(Capability *cap, StgTVarWatchQueue *fq; s = e -> tvar; TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s); - ACQ_ASSERT(s -> current_value == (StgClosure *)trec); - NACQ_ASSERT(s -> current_value == e -> expected_value); - fq = s -> first_watch_queue_entry; + ACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec); + NACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == e -> expected_value); + fq = SEQ_CST_LOAD(&s->first_watch_queue_entry); q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso); q -> next_queue_entry = fq; q -> prev_queue_entry = END_STM_WATCH_QUEUE; if (fq != END_STM_WATCH_QUEUE) { fq -> prev_queue_entry = q; } - s -> first_watch_queue_entry = q; + SEQ_CST_STORE(&s->first_watch_queue_entry, q); e -> new_value = (StgClosure *) q; dirty_TVAR(cap, s, (StgClosure *) fq); // we modified first_watch_queue_entry }); @@ -569,7 +569,7 @@ static void remove_watch_queue_entries_for_trec(Capability *cap, trec, q -> closure, s); - ACQ_ASSERT(s -> current_value == (StgClosure *)trec); + ACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec); nq = q -> next_queue_entry; pq = q -> prev_queue_entry; if (nq != END_STM_WATCH_QUEUE) { @@ -578,8 +578,8 @@ static void remove_watch_queue_entries_for_trec(Capability *cap, if (pq != END_STM_WATCH_QUEUE) { pq -> next_queue_entry = nq; } else { - ASSERT(s -> first_watch_queue_entry == q); - s -> first_watch_queue_entry = nq; + ASSERT(SEQ_CST_LOAD(&s->first_watch_queue_entry) == q); + SEQ_CST_STORE(&s->first_watch_queue_entry, nq); dirty_TVAR(cap, s, (StgClosure *) q); // we modified first_watch_queue_entry } free_stg_tvar_watch_queue(cap, q); @@ -727,7 +727,7 @@ static StgBool entry_is_read_only(TRecEntry *e) { static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) { StgClosure *c; StgBool result; - c = s -> current_value; + c = SEQ_CST_LOAD(&s->current_value); result = (c == (StgClosure *) h); return result; } @@ -800,13 +800,16 @@ static StgBool validate_and_acquire_ownership (Capability *cap, ASSERT(config_use_read_phase); IF_STM_FG_LOCKS({ TRACE("%p : will need to check %p", trec, s); - if (s -> current_value != e -> expected_value) { + // The memory ordering here must ensure that we have two distinct + // reads to current_value, with the read from num_updates between + // them. + if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) { TRACE("%p : doesn't match", trec); result = false; BREAK_FOR_EACH; } - e -> num_updates = s -> num_updates; - if (s -> current_value != e -> expected_value) { + e->num_updates = SEQ_CST_LOAD(&s->num_updates); + if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) { TRACE("%p : doesn't match (race)", trec); result = false; BREAK_FOR_EACH; @@ -828,7 +831,7 @@ static StgBool validate_and_acquire_ownership (Capability *cap, // check_read_only : check that we've seen an atomic snapshot of the // non-updated TVars accessed by a trec. This checks that the last TRec to // commit an update to the TVar is unchanged since the value was stashed in -// validate_and_acquire_ownership. If no update is seen to any TVar than +// validate_and_acquire_ownership. If no update is seen to any TVar then // all of them contained their expected values at the start of the call to // check_read_only. // @@ -847,11 +850,16 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) { if (entry_is_read_only(e)) { TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates); + // We must first load current_value then num_updates; this is inverse of + // the order of the stores in stmCommitTransaction. + StgClosure *current_value = SEQ_CST_LOAD(&s->current_value); + StgInt num_updates = SEQ_CST_LOAD(&s->num_updates); + // Note we need both checks and in this order as the TVar could be // locked by another transaction that is committing but has not yet // incremented `num_updates` (See #7815). - if (s -> current_value != e -> expected_value || - s -> num_updates != e -> num_updates) { + if (current_value != e->expected_value || + num_updates != e->num_updates) { TRACE("%p : mismatch", trec); result = false; BREAK_FOR_EACH; @@ -887,17 +895,22 @@ void stmPreGCHook (Capability *cap) { #define TOKEN_BATCH_SIZE 1024 +#if defined(THREADED_RTS) + static volatile StgInt64 max_commits = 0; -#if defined(THREADED_RTS) static volatile StgWord token_locked = false; +static StgInt64 getMaxCommits(void) { + return RELAXED_LOAD(&max_commits); +} + static void getTokenBatch(Capability *cap) { while (cas((void *)&token_locked, false, true) == true) { /* nothing */ } - max_commits += TOKEN_BATCH_SIZE; - TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits); + NONATOMIC_ADD(&max_commits, TOKEN_BATCH_SIZE); + TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, RELAXED_LOAD(&max_commits)); cap -> transaction_tokens = TOKEN_BATCH_SIZE; - token_locked = false; + RELEASE_STORE(&token_locked, false); } static void getToken(Capability *cap) { @@ -907,6 +920,10 @@ static void getToken(Capability *cap) { cap -> transaction_tokens --; } #else +static StgInt64 getMaxCommits(void) { + return 0; +} + static void getToken(Capability *cap STG_UNUSED) { // Nothing } @@ -1062,7 +1079,7 @@ static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeade /*......................................................................*/ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { - StgInt64 max_commits_at_start = max_commits; + StgInt64 max_commits_at_start = getMaxCommits(); TRACE("%p : stmCommitTransaction()", trec); ASSERT(trec != NO_TREC); @@ -1088,7 +1105,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { result = check_read_only(trec); TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed"); - max_commits_at_end = max_commits; + max_commits_at_end = getMaxCommits(); max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) + (n_capabilities * TOKEN_BATCH_SIZE)); if (((max_concurrent_commits >> 32) > 0) || shake()) { @@ -1113,7 +1130,8 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) { TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s); unpark_waiters_on(cap,s); IF_STM_FG_LOCKS({ - s -> num_updates ++; + // We have locked the TVar therefore nonatomic addition is sufficient + NONATOMIC_ADD(&s->num_updates, 1); }); unlock_tvar(cap, trec, s, e -> new_value, true); } @@ -1269,12 +1287,12 @@ StgBool stmReWait(Capability *cap, StgTSO *tso) { static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) { StgClosure *result; - result = tvar -> current_value; + result = ACQUIRE_LOAD(&tvar->current_value); #if defined(STM_FG_LOCKS) while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) { TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result); - result = tvar -> current_value; + result = ACQUIRE_LOAD(&tvar->current_value); } #endif diff --git a/rts/Schedule.c b/rts/Schedule.c index 41d0dba953..28849f32e8 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -203,6 +203,7 @@ schedule (Capability *initialCapability, Task *task) // Pre-condition: this task owns initialCapability. // The sched_mutex is *NOT* held // NB. on return, we still hold a capability. + ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task); debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no); @@ -210,6 +211,7 @@ schedule (Capability *initialCapability, Task *task) // Scheduler loop starts here: while (1) { + ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task); // Check whether we have re-entered the RTS from Haskell without // going via suspendThread()/resumeThread (i.e. a 'safe' foreign @@ -258,7 +260,7 @@ schedule (Capability *initialCapability, Task *task) // * We might be left with threads blocked in foreign calls, // we should really attempt to kill these somehow (TODO). - switch (sched_state) { + switch (RELAXED_LOAD(&sched_state)) { case SCHED_RUNNING: break; case SCHED_INTERRUPTING: @@ -270,7 +272,7 @@ schedule (Capability *initialCapability, Task *task) // other Capability did the final GC, or we did it above, // either way we can fall through to the SCHED_SHUTTING_DOWN // case now. - ASSERT(sched_state == SCHED_SHUTTING_DOWN); + ASSERT(RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN); // fall through case SCHED_SHUTTING_DOWN: @@ -372,7 +374,7 @@ schedule (Capability *initialCapability, Task *task) // killed, kill it now. This sometimes happens when a finalizer // thread is created by the final GC, or a thread previously // in a foreign call returns. - if (sched_state >= SCHED_INTERRUPTING && + if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING && !(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) { deleteThread(t); } @@ -403,7 +405,7 @@ schedule (Capability *initialCapability, Task *task) */ if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0 && !emptyThreadQueues(cap)) { - cap->context_switch = 1; + RELAXED_STORE(&cap->context_switch, 1); } run_thread: @@ -430,15 +432,15 @@ run_thread: #endif // reset the interrupt flag before running Haskell code - cap->interrupt = 0; + RELAXED_STORE(&cap->interrupt, false); cap->in_haskell = true; - cap->idle = 0; + RELAXED_STORE(&cap->idle, false); dirty_TSO(cap,t); dirty_STACK(cap,t->stackobj); - switch (recent_activity) + switch (SEQ_CST_LOAD(&recent_activity)) { case ACTIVITY_DONE_GC: { // ACTIVITY_DONE_GC means we turned off the timer signal to @@ -459,7 +461,7 @@ run_thread: // wakeUpRts(). break; default: - recent_activity = ACTIVITY_YES; + SEQ_CST_STORE(&recent_activity, ACTIVITY_YES); } traceEventRunThread(cap, t); @@ -652,8 +654,16 @@ shouldYieldCapability (Capability *cap, Task *task, bool didGcLast) // Capability keeps forcing a GC and the other Capabilities make no // progress at all. - return ((pending_sync && !didGcLast) || - cap->n_returning_tasks != 0 || + // Note [Data race in shouldYieldCapability] + // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + // We would usually need to hold cap->lock to look at n_returning_tasks but + // we don't here since this is just an approximate predicate. We + // consequently need to use atomic accesses whenever touching + // n_returning_tasks. However, since this is an approximate predicate we can + // use a RELAXED ordering. + + return ((RELAXED_LOAD(&pending_sync) && !didGcLast) || + RELAXED_LOAD(&cap->n_returning_tasks) != 0 || (!emptyRunQueue(cap) && (task->incall->tso == NULL ? peekRunQueue(cap)->bound != NULL : peekRunQueue(cap)->bound != task->incall))); @@ -680,7 +690,7 @@ scheduleYield (Capability **pcap, Task *task) if (!shouldYieldCapability(cap,task,false) && (!emptyRunQueue(cap) || !emptyInbox(cap) || - sched_state >= SCHED_INTERRUPTING)) { + RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING)) { return; } @@ -739,7 +749,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, cap0 = capabilities[i]; if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) { if (!emptyRunQueue(cap0) - || cap0->n_returning_tasks != 0 + || RELAXED_LOAD(&cap0->n_returning_tasks) != 0 || !emptyInbox(cap0)) { // it already has some work, we just grabbed it at // the wrong moment. Or maybe it's deadlocked! @@ -821,7 +831,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS, appendToRunQueue(free_caps[i],t); traceEventMigrateThread (cap, t, free_caps[i]->no); - if (t->bound) { t->bound->task->cap = free_caps[i]; } + // See Note [Benign data race due to work-pushing]. + if (t->bound) { + t->bound->task->cap = free_caps[i]; + } t->cap = free_caps[i]; n--; // we have one fewer threads now i++; // move on to the next free_cap @@ -926,7 +939,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task) * we won't eagerly start a full GC just because we don't have * any threads to run currently. */ - if (recent_activity != ACTIVITY_INACTIVE) return; + if (SEQ_CST_LOAD(&recent_activity) != ACTIVITY_INACTIVE) return; #endif if (task->incall->tso && task->incall->tso->why_blocked == BlockedOnIOCompletion) return; @@ -1127,12 +1140,12 @@ schedulePostRunThread (Capability *cap, StgTSO *t) static bool scheduleHandleHeapOverflow( Capability *cap, StgTSO *t ) { - if (cap->r.rHpLim == NULL || cap->context_switch) { + if (cap->r.rHpLim == NULL || RELAXED_LOAD(&cap->context_switch)) { // Sometimes we miss a context switch, e.g. when calling // primitives in a tight loop, MAYBE_GC() doesn't check the // context switch flag, and we end up waiting for a GC. // See #1984, and concurrent/should_run/1984 - cap->context_switch = 0; + RELAXED_STORE(&cap->context_switch, 0); appendToRunQueue(cap,t); } else { pushOnRunQueue(cap,t); @@ -1233,8 +1246,8 @@ scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next ) // the CPU because the tick always arrives during GC). This way // penalises threads that do a lot of allocation, but that seems // better than the alternative. - if (cap->context_switch != 0) { - cap->context_switch = 0; + if (RELAXED_LOAD(&cap->context_switch) != 0) { + RELAXED_STORE(&cap->context_switch, 0); appendToRunQueue(cap,t); } else { pushOnRunQueue(cap,t); @@ -1333,7 +1346,7 @@ scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t) if (task->incall->ret) { *(task->incall->ret) = NULL; } - if (sched_state >= SCHED_INTERRUPTING) { + if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING) { if (heap_overflow) { task->incall->rstat = HeapExhausted; } else { @@ -1478,7 +1491,7 @@ static bool requestSync ( sync->type); ASSERT(*pcap); yieldCapability(pcap,task,true); - sync = pending_sync; + sync = SEQ_CST_LOAD(&pending_sync); } while (sync != NULL); } @@ -1509,7 +1522,7 @@ static void acquireAllCapabilities(Capability *cap, Task *task) Capability *tmpcap; uint32_t i; - ASSERT(pending_sync != NULL); + ASSERT(SEQ_CST_LOAD(&pending_sync) != NULL); for (i=0; i < n_capabilities; i++) { debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)", i, n_capabilities); @@ -1581,7 +1594,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, // cycle. #endif - if (sched_state == SCHED_SHUTTING_DOWN) { + if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) { // The final GC has already been done, and the system is // shutting down. We'll probably deadlock if we try to GC // now. @@ -1596,7 +1609,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, major_gc = (collect_gen == RtsFlags.GcFlags.generations-1); #if defined(THREADED_RTS) - if (sched_state < SCHED_INTERRUPTING + if (RELAXED_LOAD(&sched_state) < SCHED_INTERRUPTING && RtsFlags.ParFlags.parGcEnabled && collect_gen >= RtsFlags.ParFlags.parGcGen && ! oldest_gen->mark) @@ -1689,7 +1702,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, } if (was_syncing && (prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) && - !(sched_state == SCHED_INTERRUPTING && force_major)) { + !(RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && force_major)) { // someone else had a pending sync request for a GC, so // let's assume GC has been done and we don't need to GC // again. @@ -1697,7 +1710,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, // need to do the final GC. return; } - if (sched_state == SCHED_SHUTTING_DOWN) { + if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) { // The scheduler might now be shutting down. We tested // this above, but it might have become true since then as // we yielded the capability in requestSync(). @@ -1780,7 +1793,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS, debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps); for (i=0; i < n_capabilities; i++) { - capabilities[i]->idle++; + NONATOMIC_ADD(&capabilities[i]->idle, 1); } // For all capabilities participating in this GC, wait until @@ -1802,7 +1815,7 @@ delete_threads_and_gc: * threads in the system. * Checking for major_gc ensures that the last GC is major. */ - if (sched_state == SCHED_INTERRUPTING && major_gc) { + if (RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && major_gc) { deleteAllThreads(); #if defined(THREADED_RTS) // Discard all the sparks from every Capability. Why? @@ -1816,7 +1829,7 @@ delete_threads_and_gc: discardSparksCap(capabilities[i]); } #endif - sched_state = SCHED_SHUTTING_DOWN; + RELAXED_STORE(&sched_state, SCHED_SHUTTING_DOWN); } /* @@ -1861,20 +1874,20 @@ delete_threads_and_gc: #endif // If we're shutting down, don't leave any idle GC work to do. - if (sched_state == SCHED_SHUTTING_DOWN) { + if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) { doIdleGCWork(cap, true /* all of it */); } traceSparkCounters(cap); - switch (recent_activity) { + switch (SEQ_CST_LOAD(&recent_activity)) { case ACTIVITY_INACTIVE: if (force_major) { // We are doing a GC because the system has been idle for a // timeslice and we need to check for deadlock. Record the // fact that we've done a GC and turn off the timer signal; // it will get re-enabled if we run any threads after the GC. - recent_activity = ACTIVITY_DONE_GC; + SEQ_CST_STORE(&recent_activity, ACTIVITY_DONE_GC); #if !defined(PROFILING) stopTimer(); #endif @@ -1886,7 +1899,7 @@ delete_threads_and_gc: // the GC might have taken long enough for the timer to set // recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE, // but we aren't necessarily deadlocked: - recent_activity = ACTIVITY_YES; + SEQ_CST_STORE(&recent_activity, ACTIVITY_YES); break; case ACTIVITY_DONE_GC: @@ -1936,7 +1949,7 @@ delete_threads_and_gc: releaseGCThreads(cap, idle_cap); } #endif - if (heap_overflow && sched_state == SCHED_RUNNING) { + if (heap_overflow && RELAXED_LOAD(&sched_state) == SCHED_RUNNING) { // GC set the heap_overflow flag. We should throw an exception if we // can, or shut down otherwise. @@ -1948,7 +1961,7 @@ delete_threads_and_gc: // shutdown now. Ultimately we want the main thread to return to // its caller with HeapExhausted, at which point the caller should // call hs_exit(). The first step is to delete all the threads. - sched_state = SCHED_INTERRUPTING; + RELAXED_STORE(&sched_state, SCHED_INTERRUPTING); goto delete_threads_and_gc; } @@ -2027,12 +2040,14 @@ forkProcess(HsStablePtr *entry ACQUIRE_LOCK(&sm_mutex); ACQUIRE_LOCK(&stable_ptr_mutex); ACQUIRE_LOCK(&stable_name_mutex); - ACQUIRE_LOCK(&task->lock); for (i=0; i < n_capabilities; i++) { ACQUIRE_LOCK(&capabilities[i]->lock); } + // Take task lock after capability lock to avoid order inversion (#17275). + ACQUIRE_LOCK(&task->lock); + #if defined(THREADED_RTS) ACQUIRE_LOCK(&all_tasks_mutex); #endif @@ -2236,6 +2251,12 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS) cap = rts_lock(); task = cap->running_task; + + // N.B. We must stop the interval timer while we are changing the + // capabilities array lest handle_tick may try to context switch + // an old capability. See #17289. + stopTimer(); + stopAllCapabilities(&cap, task); if (new_n_capabilities < enabled_capabilities) @@ -2318,6 +2339,8 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS) // Notify IO manager that the number of capabilities has changed. rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL); + startTimer(); + rts_unlock(cap); #endif // THREADED_RTS @@ -2626,8 +2649,9 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap) #if defined(THREADED_RTS) void scheduleWorker (Capability *cap, Task *task) { - // schedule() runs without a lock. + ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task); cap = schedule(cap,task); + ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task); // On exit from schedule(), we have a Capability, but possibly not // the same one we started with. @@ -2687,7 +2711,7 @@ initScheduler(void) #endif sched_state = SCHED_RUNNING; - recent_activity = ACTIVITY_YES; + SEQ_CST_STORE(&recent_activity, ACTIVITY_YES); /* Initialise the mutex and condition variables used by @@ -2729,8 +2753,8 @@ exitScheduler (bool wait_foreign USED_IF_THREADS) Task *task = newBoundTask(); // If we haven't killed all the threads yet, do it now. - if (sched_state < SCHED_SHUTTING_DOWN) { - sched_state = SCHED_INTERRUPTING; + if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN) { + RELAXED_STORE(&sched_state, SCHED_INTERRUPTING); nonmovingStop(); Capability *cap = task->cap; waitForCapability(&cap,task); diff --git a/rts/Schedule.h b/rts/Schedule.h index a550a6763a..4c692842e7 100644 --- a/rts/Schedule.h +++ b/rts/Schedule.h @@ -184,10 +184,13 @@ popRunQueue (Capability *cap) StgTSO *t = cap->run_queue_hd; ASSERT(t != END_TSO_QUEUE); cap->run_queue_hd = t->_link; - if (t->_link != END_TSO_QUEUE) { - t->_link->block_info.prev = END_TSO_QUEUE; + + StgTSO *link = RELAXED_LOAD(&t->_link); + if (link != END_TSO_QUEUE) { + link->block_info.prev = END_TSO_QUEUE; } - t->_link = END_TSO_QUEUE; // no write barrier req'd + RELAXED_STORE(&t->_link, END_TSO_QUEUE); // no write barrier req'd + if (cap->run_queue_hd == END_TSO_QUEUE) { cap->run_queue_tl = END_TSO_QUEUE; } @@ -230,12 +233,18 @@ emptyQueue (StgTSO *q) INLINE_HEADER bool emptyRunQueue(Capability *cap) { + // Can only be called by the task owning the capability. + TSAN_ANNOTATE_BENIGN_RACE(&cap->n_run_queue, "emptyRunQueue"); return cap->n_run_queue == 0; } INLINE_HEADER void truncateRunQueue(Capability *cap) { + // Can only be called by the task owning the capability. + TSAN_ANNOTATE_BENIGN_RACE(&cap->run_queue_hd, "truncateRunQueue"); + TSAN_ANNOTATE_BENIGN_RACE(&cap->run_queue_tl, "truncateRunQueue"); + TSAN_ANNOTATE_BENIGN_RACE(&cap->n_run_queue, "truncateRunQueue"); cap->run_queue_hd = END_TSO_QUEUE; cap->run_queue_tl = END_TSO_QUEUE; cap->n_run_queue = 0; diff --git a/rts/Sparks.c b/rts/Sparks.c index 2012b0682b..47cf310188 100644 --- a/rts/Sparks.c +++ b/rts/Sparks.c @@ -92,7 +92,7 @@ pruneSparkQueue (bool nonmovingMarkFinished, Capability *cap) SparkPool *pool; StgClosurePtr spark, tmp, *elements; uint32_t n, pruned_sparks; // stats only - StgWord botInd,oldBotInd,currInd; // indices in array (always < size) + StgInt botInd,oldBotInd,currInd; // indices in array (always < size) const StgInfoTable *info; n = 0; @@ -111,7 +111,6 @@ pruneSparkQueue (bool nonmovingMarkFinished, Capability *cap) // stealing is happening during GC. pool->bottom -= pool->top & ~pool->moduloSize; pool->top &= pool->moduloSize; - pool->topBound = pool->top; debugTrace(DEBUG_sparks, "markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)", @@ -259,7 +258,6 @@ pruneSparkQueue (bool nonmovingMarkFinished, Capability *cap) ASSERT(currInd == oldBotInd); pool->top = oldBotInd; // where we started writing - pool->topBound = pool->top; pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size); // first free place we did not use (corrected by wraparound) diff --git a/rts/SpinLock.c b/rts/SpinLock.c new file mode 100644 index 0000000000..5289694aa7 --- /dev/null +++ b/rts/SpinLock.c @@ -0,0 +1,41 @@ +/* ---------------------------------------------------------------------------- + * + * (c) The GHC Team, 2006-2009 + * + * Spin locks + * + * These are simple spin-only locks as opposed to Mutexes which + * probably spin for a while before blocking in the kernel. We use + * these when we are sure that all our threads are actively running on + * a CPU, eg. in the GC. + * + * TODO: measure whether we really need these, or whether Mutexes + * would do (and be a bit safer if a CPU becomes loaded). + * + * Do not #include this file directly: #include "Rts.h" instead. + * + * To understand the structure of the RTS headers, see the wiki: + * https://gitlab.haskell.org/ghc/ghc/wikis/commentary/source-tree/includes + * + * -------------------------------------------------------------------------- */ + +#include "PosixSource.h" +#include "Rts.h" + +#if defined(THREADED_RTS) + +void acquire_spin_lock_slow_path(SpinLock * p) +{ + do { + for (uint32_t i = 0; i < SPIN_COUNT; i++) { + StgWord32 r = cas((StgVolatilePtr)&(p->lock), 1, 0); + if (r != 0) return; + IF_PROF_SPIN(RELAXED_ADD(&p->spin, 1)); + busy_wait_nop(); + } + IF_PROF_SPIN(RELAXED_ADD(&p->yield, 1)); + yieldThread(); + } while (1); +} + +#endif diff --git a/rts/StablePtr.c b/rts/StablePtr.c index edcd863183..469a17a5b9 100644 --- a/rts/StablePtr.c +++ b/rts/StablePtr.c @@ -191,9 +191,10 @@ enlargeStablePtrTable(void) /* When using the threaded RTS, the update of stable_ptr_table is assumed to * be atomic, so that another thread simultaneously dereferencing a stable - * pointer will always read a valid address. + * pointer will always read a valid address. Release ordering to ensure + * that the new table is visible to others. */ - stable_ptr_table = new_stable_ptr_table; + RELEASE_STORE(&stable_ptr_table, new_stable_ptr_table); initSpEntryFreeList(stable_ptr_table + old_SPT_size, old_SPT_size, NULL); } @@ -247,7 +248,7 @@ exitStablePtrTable(void) STATIC_INLINE void freeSpEntry(spEntry *sp) { - sp->addr = (P_)stable_ptr_free; + RELAXED_STORE(&sp->addr, (P_)stable_ptr_free); stable_ptr_free = sp; } @@ -279,7 +280,7 @@ getStablePtr(StgPtr p) if (!stable_ptr_free) enlargeStablePtrTable(); sp = stable_ptr_free - stable_ptr_table; stable_ptr_free = (spEntry*)(stable_ptr_free->addr); - stable_ptr_table[sp].addr = p; + RELAXED_STORE(&stable_ptr_table[sp].addr, p); stablePtrUnlock(); return (StgStablePtr)(sp); } diff --git a/rts/Task.c b/rts/Task.c index 11ba5f1581..9311a16ce0 100644 --- a/rts/Task.c +++ b/rts/Task.c @@ -30,6 +30,7 @@ Task *all_tasks = NULL; // current number of bound tasks + total number of worker tasks. +// Locks required: all_tasks_mutex. uint32_t taskCount; uint32_t workerCount; uint32_t currentWorkerCount; @@ -237,6 +238,8 @@ newTask (bool worker) all_tasks = task; taskCount++; + debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount); + if (worker) { workerCount++; currentWorkerCount++; @@ -311,8 +314,6 @@ newBoundTask (void) task->stopped = false; newInCall(task); - - debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount); return task; } @@ -473,7 +474,7 @@ startWorkerTask (Capability *cap) // else get in, because the new worker Task has nowhere to go to // sleep so that it could be woken up again. ASSERT_LOCK_HELD(&cap->lock); - cap->running_task = task; + RELAXED_STORE(&cap->running_task, task); // Set the name of the worker thread to the original process name followed by // ":w", but only if we're on Linux where the program_invocation_short_name diff --git a/rts/ThreadPaused.c b/rts/ThreadPaused.c index 83c621e386..13fc2b4ca0 100644 --- a/rts/ThreadPaused.c +++ b/rts/ThreadPaused.c @@ -243,7 +243,7 @@ threadPaused(Capability *cap, StgTSO *tso) SET_INFO(frame, (StgInfoTable *)&stg_marked_upd_frame_info); bh = ((StgUpdateFrame *)frame)->updatee; - bh_info = bh->header.info; + bh_info = ACQUIRE_LOAD(&bh->header.info); IF_NONMOVING_WRITE_BARRIER_ENABLED { updateRemembSetPushClosure(cap, (StgClosure *) bh); } @@ -287,7 +287,7 @@ threadPaused(Capability *cap, StgTSO *tso) // suspended by this mechanism. See Note [AP_STACKs must be eagerly // blackholed] for details. if (((bh_info == &stg_BLACKHOLE_info) - && ((StgInd*)bh)->indirectee != (StgClosure*)tso) + && (RELAXED_LOAD(&((StgInd*)bh)->indirectee) != (StgClosure*)tso)) || (bh_info == &stg_WHITEHOLE_info)) { debugTrace(DEBUG_squeeze, @@ -331,7 +331,7 @@ threadPaused(Capability *cap, StgTSO *tso) if (cur_bh_info != bh_info) { bh_info = cur_bh_info; #if defined(PROF_SPIN) - ++whitehole_threadPaused_spin; + NONATOMIC_ADD(&whitehole_threadPaused_spin, 1); #endif busy_wait_nop(); goto retry; @@ -351,9 +351,8 @@ threadPaused(Capability *cap, StgTSO *tso) } // The payload of the BLACKHOLE points to the TSO - ((StgInd *)bh)->indirectee = (StgClosure *)tso; - write_barrier(); - SET_INFO(bh,&stg_BLACKHOLE_info); + RELAXED_STORE(&((StgInd *)bh)->indirectee, (StgClosure *)tso); + SET_INFO_RELEASE(bh,&stg_BLACKHOLE_info); // .. and we need a write barrier, since we just mutated the closure: recordClosureMutated(cap,bh); diff --git a/rts/Threads.c b/rts/Threads.c index 54c703963e..6050549d64 100644 --- a/rts/Threads.c +++ b/rts/Threads.c @@ -277,8 +277,6 @@ tryWakeupThread (Capability *cap, StgTSO *tso) msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup)); msg->tso = tso; SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM); - // Ensure that writes constructing Message are committed before sending. - write_barrier(); sendMessage(cap, tso->cap, (Message*)msg); debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d", (W_)tso->id, tso->cap->no); @@ -355,9 +353,14 @@ unblock: migrateThread ------------------------------------------------------------------------- */ +// Precondition: The caller must own the `from` capability. void migrateThread (Capability *from, StgTSO *tso, Capability *to) { + // Sadly we can't assert this since migrateThread is called from + // scheduleDoGC, where we implicly own all capabilities. + //ASSERT_FULL_CAPABILITY_INVARIANTS(from, getTask()); + traceEventMigrateThread (from, tso, to->no); // ThreadMigrating tells the target cap that it needs to be added to // the run queue when it receives the MSG_TRY_WAKEUP. @@ -383,8 +386,7 @@ wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq) for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE; msg = msg->link) { - i = msg->header.info; - load_load_barrier(); + i = ACQUIRE_LOAD(&msg->header.info); if (i != &stg_IND_info) { ASSERT(i == &stg_MSG_BLACKHOLE_info); tryWakeupThread(cap,msg->tso); @@ -414,8 +416,7 @@ checkBlockingQueues (Capability *cap, StgTSO *tso) for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) { next = bq->link; - const StgInfoTable *bqinfo = bq->header.info; - load_load_barrier(); // XXX: Is this needed? + const StgInfoTable *bqinfo = ACQUIRE_LOAD(&bq->header.info); if (bqinfo == &stg_IND_info) { // ToDo: could short it out right here, to avoid // traversing this IND multiple times. @@ -423,8 +424,7 @@ checkBlockingQueues (Capability *cap, StgTSO *tso) } p = bq->bh; - const StgInfoTable *pinfo = p->header.info; - load_load_barrier(); + const StgInfoTable *pinfo = ACQUIRE_LOAD(&p->header.info); if (pinfo != &stg_BLACKHOLE_info || ((StgInd *)p)->indirectee != (StgClosure*)bq) { @@ -448,8 +448,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) StgTSO *owner; const StgInfoTable *i; - i = thunk->header.info; - load_load_barrier(); + i = ACQUIRE_LOAD(&thunk->header.info); if (i != &stg_BLACKHOLE_info && i != &stg_CAF_BLACKHOLE_info && i != &__stg_EAGER_BLACKHOLE_info && @@ -469,8 +468,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val) return; } - i = v->header.info; - load_load_barrier(); + i = ACQUIRE_LOAD(&v->header.info); if (i == &stg_TSO_info) { checkBlockingQueues(cap, tso); return; @@ -796,8 +794,7 @@ loop: return true; } - qinfo = q->header.info; - load_load_barrier(); + qinfo = ACQUIRE_LOAD(&q->header.info); if (qinfo == &stg_IND_info || qinfo == &stg_MSG_NULL_info) { q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee; @@ -814,15 +811,15 @@ loop: ASSERT(tso->block_info.closure == (StgClosure*)mvar); // save why_blocked here, because waking up the thread destroys // this information - StgWord why_blocked = tso->why_blocked; + StgWord why_blocked = RELAXED_LOAD(&tso->why_blocked); // actually perform the takeMVar StgStack* stack = tso->stackobj; - stack->sp[1] = (W_)value; - stack->sp[0] = (W_)&stg_ret_p_info; + RELAXED_STORE(&stack->sp[1], (W_)value); + RELAXED_STORE(&stack->sp[0], (W_)&stg_ret_p_info); // indicate that the MVar operation has now completed. - tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure; + RELEASE_STORE(&tso->_link, (StgTSO*)&stg_END_TSO_QUEUE_closure); if ((stack->dirty & STACK_DIRTY) == 0) { dirty_STACK(cap, stack); diff --git a/rts/Timer.c b/rts/Timer.c index 057cffc7c9..97d87ad989 100644 --- a/rts/Timer.c +++ b/rts/Timer.c @@ -25,6 +25,15 @@ #include "Capability.h" #include "RtsSignals.h" +// This global counter is used to allow multiple threads to stop the +// timer temporarily with a stopTimer()/startTimer() pair. If +// timer_enabled == 0 timer is enabled +// timer_disabled == N, N > 0 timer is disabled by N threads +// When timer_enabled makes a transition to 0, we enable the timer, +// and when it makes a transition to non-0 we disable it. + +static StgWord timer_disabled; + /* ticks left before next pre-emptive context switch */ static int ticks_to_ctxt_switch = 0; @@ -92,7 +101,9 @@ void handle_tick(int unused STG_UNUSED) { handleProfTick(); - if (RtsFlags.ConcFlags.ctxtSwitchTicks > 0) { + if (RtsFlags.ConcFlags.ctxtSwitchTicks > 0 + && SEQ_CST_LOAD(&timer_disabled) == 0) + { ticks_to_ctxt_switch--; if (ticks_to_ctxt_switch <= 0) { ticks_to_ctxt_switch = RtsFlags.ConcFlags.ctxtSwitchTicks; @@ -106,16 +117,16 @@ handle_tick(int unused STG_UNUSED) * for threads that are deadlocked. However, ensure we wait * at least interIdleGCWait (+RTS -Iw) between idle GCs. */ - switch (recent_activity) { + switch (SEQ_CST_LOAD(&recent_activity)) { case ACTIVITY_YES: - recent_activity = ACTIVITY_MAYBE_NO; + SEQ_CST_STORE(&recent_activity, ACTIVITY_MAYBE_NO); idle_ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTime / RtsFlags.MiscFlags.tickInterval; break; case ACTIVITY_MAYBE_NO: if (idle_ticks_to_gc == 0 && inter_gc_ticks_to_gc == 0) { if (RtsFlags.GcFlags.doIdleGC) { - recent_activity = ACTIVITY_INACTIVE; + SEQ_CST_STORE(&recent_activity, ACTIVITY_INACTIVE); inter_gc_ticks_to_gc = RtsFlags.GcFlags.interIdleGCWait / RtsFlags.MiscFlags.tickInterval; #if defined(THREADED_RTS) @@ -124,7 +135,7 @@ handle_tick(int unused STG_UNUSED) // the GC. #endif } else { - recent_activity = ACTIVITY_DONE_GC; + SEQ_CST_STORE(&recent_activity, ACTIVITY_DONE_GC); // disable timer signals (see #1623, #5991, #9105) // but only if we're not profiling (e.g. passed -h or -p RTS // flags). If we are profiling we need to keep the timer active @@ -148,15 +159,6 @@ handle_tick(int unused STG_UNUSED) } } -// This global counter is used to allow multiple threads to stop the -// timer temporarily with a stopTimer()/startTimer() pair. If -// timer_enabled == 0 timer is enabled -// timer_disabled == N, N > 0 timer is disabled by N threads -// When timer_enabled makes a transition to 0, we enable the timer, -// and when it makes a transition to non-0 we disable it. - -static StgWord timer_disabled; - void initTimer(void) { @@ -164,7 +166,7 @@ initTimer(void) if (RtsFlags.MiscFlags.tickInterval != 0) { initTicker(RtsFlags.MiscFlags.tickInterval, handle_tick); } - timer_disabled = 1; + SEQ_CST_STORE(&timer_disabled, 1); } void diff --git a/rts/Updates.h b/rts/Updates.h index 608aaff524..aa5fbe0133 100644 --- a/rts/Updates.h +++ b/rts/Updates.h @@ -76,7 +76,6 @@ INLINE_HEADER void updateWithIndirection (Capability *cap, /* not necessarily true: ASSERT( !closure_IND(p1) ); */ /* occurs in RaiseAsync.c:raiseAsync() */ /* See Note [Heap memory barriers] in SMP.h */ - write_barrier(); bdescr *bd = Bdescr((StgPtr)p1); if (bd->gen_no != 0) { IF_NONMOVING_WRITE_BARRIER_ENABLED { @@ -88,9 +87,8 @@ INLINE_HEADER void updateWithIndirection (Capability *cap, TICK_UPD_NEW_IND(); } OVERWRITING_CLOSURE(p1); - ((StgInd *)p1)->indirectee = p2; - write_barrier(); - SET_INFO(p1, &stg_BLACKHOLE_info); + RELEASE_STORE(&((StgInd *)p1)->indirectee, p2); + SET_INFO_RELEASE(p1, &stg_BLACKHOLE_info); LDV_RECORD_CREATE(p1); } diff --git a/rts/WSDeque.c b/rts/WSDeque.c index 60b8948149..d930d848a4 100644 --- a/rts/WSDeque.c +++ b/rts/WSDeque.c @@ -11,7 +11,15 @@ * SPAA'05, July 2005, Las Vegas, USA. * ACM 1-58113-986-1/05/0007 * + * This implementation closely follows the C11 implementation presented in + * + * N.M. Lê, A. Pop, A.Cohen, and F.Z. Nardelli. "Correct and Efficient + * Work-Stealing for Weak Memory Models". PPoPP'13, February 2013, + * ACM 978-1-4503-1922/13/02. + * * Author: Jost Berthold MSRC 07-09/2008 + * Rewritten by: Ben Gamari (Well-Typed) + * * * The DeQue is held as a circular array with known length. Positions * of top (read-end) and bottom (write-end) always increase, and the @@ -44,7 +52,13 @@ #include "RtsUtils.h" #include "WSDeque.h" -#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new))) +// Returns true on success. +static inline bool +cas_top(WSDeque *q, StgInt old, StgInt new) +{ + return (StgWord) old == cas((StgPtr) &q->top, (StgWord) old, (StgWord) new); +} + /* ----------------------------------------------------------------------------- * newWSDeque @@ -80,13 +94,12 @@ newWSDeque (uint32_t size) "newWSDeque"); q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */ "newWSDeque:data space"); - q->top=0; - q->bottom=0; - q->topBound=0; /* read by writer, updated each time top is read */ - q->size = realsize; /* power of 2 */ q->moduloSize = realsize - 1; /* n % size == n & moduloSize */ + q->top=0; + RELEASE_STORE(&q->bottom, 0); /* read by writer, updated each time top is read */ + ASSERT_WSDEQUE_INVARIANTS(q); return q; } @@ -118,56 +131,31 @@ freeWSDeque (WSDeque *q) void * popWSDeque (WSDeque *q) { - /* also a bit tricky, has to avoid concurrent steal() calls by - accessing top with cas, when there is only one element left */ - StgWord t, b; - long currSize; - void * removed; - - ASSERT_WSDEQUE_INVARIANTS(q); - - b = q->bottom; - - // "decrement b as a test, see what happens" - b--; - q->bottom = b; - - // very important that the following read of q->top does not occur - // before the earlier write to q->bottom. - store_load_barrier(); - - t = q->top; /* using topBound would give an *upper* bound, we - need a lower bound. We use the real top here, but - can update the topBound value */ - q->topBound = t; - currSize = (long)b - (long)t; - if (currSize < 0) { /* was empty before decrementing b, set b - consistently and abort */ - q->bottom = t; - return NULL; - } - - // read the element at b - removed = q->elements[b & q->moduloSize]; - - if (currSize > 0) { /* no danger, still elements in buffer after b-- */ - // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed); - return removed; - } - /* otherwise, has someone meanwhile stolen the same (last) element? - Check and increment top value to know */ - if ( !(CASTOP(&(q->top),t,t+1)) ) { - removed = NULL; /* no success, but continue adjusting bottom */ + StgInt b = RELAXED_LOAD(&q->bottom) - 1; + RELAXED_STORE(&q->bottom, b); + SEQ_CST_FENCE(); + StgInt t = RELAXED_LOAD(&q->top); + + void *result; + if (t <= b) { + /* Non-empty */ + result = RELAXED_LOAD(&q->elements[b & q->moduloSize]); + if (t == b) { + /* Single last element in queue */ + if (!cas_top(q, t, t+1)) { + /* Failed race */ + result = NULL; + } + + RELAXED_STORE(&q->bottom, b+1); + } + } else { + /* Empty queue */ + result = NULL; + RELAXED_STORE(&q->bottom, b+1); } - q->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */ - q->topBound = t+1; /* ...and cached top value as well */ - ASSERT_WSDEQUE_INVARIANTS(q); - ASSERT(q->bottom >= q->top); - - // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed); - - return removed; + return result; } /* ----------------------------------------------------------------------------- @@ -177,43 +165,19 @@ popWSDeque (WSDeque *q) void * stealWSDeque_ (WSDeque *q) { - void * stolen; - StgWord b,t; - -// Can't do this on someone else's spark pool: -// ASSERT_WSDEQUE_INVARIANTS(q); - - // NB. these loads must be ordered, otherwise there is a race - // between steal and pop. - t = q->top; - load_load_barrier(); - b = q->bottom; - - // NB. b and t are unsigned; we need a signed value for the test - // below, because it is possible that t > b during a - // concurrent popWSQueue() operation. - if ((long)b - (long)t <= 0 ) { - return NULL; /* already looks empty, abort */ + StgInt t = ACQUIRE_LOAD(&q->top); + SEQ_CST_FENCE(); + StgInt b = ACQUIRE_LOAD(&q->bottom); + + void *result = NULL; + if (t < b) { + /* Non-empty queue */ + result = RELAXED_LOAD(&q->elements[t % q->size]); + if (!cas_top(q, t, t+1)) { + return NULL; + } } - // NB. the load of q->bottom must be ordered before the load of - // q->elements[t & q-> moduloSize]. See comment "KG:..." below - // and Ticket #13633. - load_load_barrier(); - /* now access array, see pushBottom() */ - stolen = q->elements[t & q->moduloSize]; - - /* now decide whether we have won */ - if ( !(CASTOP(&(q->top),t,t+1)) ) { - /* lost the race, someone else has changed top in the meantime */ - return NULL; - } /* else: OK, top has been incremented by the cas call */ - - // debugBelch("stealWSDeque_: t=%d b=%d\n", t, b); - -// Can't do this on someone else's spark pool: -// ASSERT_WSDEQUE_INVARIANTS(q); - - return stolen; + return result; } void * @@ -232,67 +196,30 @@ stealWSDeque (WSDeque *q) * pushWSQueue * -------------------------------------------------------------------------- */ -#define DISCARD_NEW - -/* enqueue an element. Should always succeed by resizing the array - (not implemented yet, silently fails in that case). */ +/* Enqueue an element. Must only be called by owner. Returns true if element was + * pushed, false if queue is full + */ bool pushWSDeque (WSDeque* q, void * elem) { - StgWord t; - StgWord b; - StgWord sz = q->moduloSize; + StgInt b = ACQUIRE_LOAD(&q->bottom); + StgInt t = ACQUIRE_LOAD(&q->top); - ASSERT_WSDEQUE_INVARIANTS(q); - - /* we try to avoid reading q->top (accessed by all) and use - q->topBound (accessed only by writer) instead. - This is why we do not just call empty(q) here. - */ - b = q->bottom; - t = q->topBound; - if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) { - /* NB. 1. sz == q->size - 1, thus ">=" - 2. signed comparison, it is possible that t > b - */ - /* could be full, check the real top value in this case */ - t = q->top; - q->topBound = t; - if (b - t >= sz) { /* really no space left :-( */ - /* reallocate the array, copying the values. Concurrent steal()s - will in the meantime use the old one and modify only top. - This means: we cannot safely free the old space! Can keep it - on a free list internally here... + if ( b - t > q->size - 1 ) { + /* Full queue */ + /* We don't implement resizing, just say we didn't push anything. */ + return false; + } - Potential bug in combination with steal(): if array is - replaced, it is unclear which one concurrent steal operations - use. Must read the array base address in advance in steal(). - */ -#if defined(DISCARD_NEW) - ASSERT_WSDEQUE_INVARIANTS(q); - return false; // we didn't push anything + RELAXED_STORE(&q->elements[b & q->moduloSize], elem); +#if defined(TSAN_ENABLED) + // ThreadSanizer doesn't know about release fences, so we need to + // strengthen this to a release store lest we get spurious data race + // reports. + RELEASE_STORE(&q->bottom, b+1); #else - /* could make room by incrementing the top position here. In - * this case, should use CASTOP. If this fails, someone else has - * removed something, and new room will be available. - */ - ASSERT_WSDEQUE_INVARIANTS(q); + RELEASE_FENCE(); + RELAXED_STORE(&q->bottom, b+1); #endif - } - } - - q->elements[b & sz] = elem; - /* - KG: we need to put write barrier here since otherwise we might - end with elem not added to q->elements, but q->bottom already - modified (write reordering) and with stealWSDeque_ failing - later when invoked from another thread since it thinks elem is - there (in case there is just added element in the queue). This - issue concretely hit me on ARMv7 multi-core CPUs - */ - write_barrier(); - q->bottom = b + 1; - - ASSERT_WSDEQUE_INVARIANTS(q); return true; } diff --git a/rts/WSDeque.h b/rts/WSDeque.h index 2936c281fe..0104884bdb 100644 --- a/rts/WSDeque.h +++ b/rts/WSDeque.h @@ -11,24 +11,19 @@ typedef struct WSDeque_ { // Size of elements array. Used for modulo calculation: we round up // to powers of 2 and use the dyadic log (modulo == bitwise &) - StgWord size; + StgInt size; StgWord moduloSize; /* bitmask for modulo */ // top, index where multiple readers steal() (protected by a cas) - volatile StgWord top; + StgInt top; // bottom, index of next free place where one writer can push // elements. This happens unsynchronised. - volatile StgWord bottom; + StgInt bottom; // both top and bottom are continuously incremented, and used as // an index modulo the current array size. - // lower bound on the current top value. This is an internal - // optimisation to avoid unnecessarily accessing the top field - // inside pushBottom - volatile StgWord topBound; - // The elements array void ** elements; @@ -39,18 +34,17 @@ typedef struct WSDeque_ { } WSDeque; /* INVARIANTS, in this order: reasonable size, - topBound consistent, space pointer, space accessible to us. + space pointer, space accessible to us. NB. This is safe to use only (a) on a spark pool owned by the current thread, or (b) when there's only one thread running, or no stealing going on (e.g. during GC). */ -#define ASSERT_WSDEQUE_INVARIANTS(p) \ - ASSERT((p)->size > 0); \ - ASSERT((p)->topBound <= (p)->top); \ - ASSERT((p)->elements != NULL); \ - ASSERT(*((p)->elements) || 1); \ - ASSERT(*((p)->elements - 1 + ((p)->size)) || 1); +#define ASSERT_WSDEQUE_INVARIANTS(p) \ + ASSERT((p)->size > 0); \ + ASSERT(RELAXED_LOAD(&(p)->elements) != NULL); \ + ASSERT(RELAXED_LOAD(&(p)->elements[0]) || 1); \ + ASSERT(RELAXED_LOAD(&(p)->elements[(p)->size - 1]) || 1); // No: it is possible that top > bottom when using pop() // ASSERT((p)->bottom >= (p)->top); @@ -69,15 +63,15 @@ typedef struct WSDeque_ { WSDeque * newWSDeque (uint32_t size); void freeWSDeque (WSDeque *q); -// Take an element from the "write" end of the pool. Can be called +// (owner-only) Take an element from the "write" end of the pool. Can be called // by the pool owner only. void* popWSDeque (WSDeque *q); -// Push onto the "write" end of the pool. Return true if the push +// (owner-only) Push onto the "write" end of the pool. Return true if the push // succeeded, or false if the deque is full. bool pushWSDeque (WSDeque *q, void *elem); -// Removes all elements from the deque +// (owner-only) Removes all elements from the deque. EXTERN_INLINE void discardElements (WSDeque *q); // Removes an element of the deque from the "read" end, or returns @@ -90,23 +84,27 @@ void * stealWSDeque_ (WSDeque *q); void * stealWSDeque (WSDeque *q); // "guesses" whether a deque is empty. Can return false negatives in -// presence of concurrent steal() calls, and false positives in -// presence of a concurrent pushBottom(). +// presence of concurrent steal() calls, and false positives in +// presence of a concurrent pushBottom(). EXTERN_INLINE bool looksEmptyWSDeque (WSDeque* q); -EXTERN_INLINE long dequeElements (WSDeque *q); +// "guesses" how many elements are present on the deque. Like +// looksEmptyWSDeque, this may suggest that the deque is empty when it's not +// and vice-versa. +EXTERN_INLINE StgInt dequeElements (WSDeque *q); /* ----------------------------------------------------------------------------- * PRIVATE below here * -------------------------------------------------------------------------- */ -EXTERN_INLINE long +EXTERN_INLINE StgInt dequeElements (WSDeque *q) { - StgWord t = q->top; - StgWord b = q->bottom; + StgWord t = ACQUIRE_LOAD(&q->top); + StgWord b = ACQUIRE_LOAD(&q->bottom); // try to prefer false negatives by reading top first - return ((long)b - (long)t); + StgInt n = (StgInt)b - (StgInt)t; + return n > 0 ? n : 0; } EXTERN_INLINE bool @@ -118,6 +116,5 @@ looksEmptyWSDeque (WSDeque *q) EXTERN_INLINE void discardElements (WSDeque *q) { - q->top = q->bottom; -// pool->topBound = pool->top; + RELAXED_STORE(&q->top, RELAXED_LOAD(&q->bottom)); } diff --git a/rts/Weak.c b/rts/Weak.c index fe4516794a..0adf5a8b92 100644 --- a/rts/Weak.c +++ b/rts/Weak.c @@ -57,8 +57,7 @@ runAllCFinalizers(StgWeak *list) // If there's no major GC between the time that the finalizer for the // object from the oldest generation is manually called and shutdown // we end up running the same finalizer twice. See #7170. - const StgInfoTable *winfo = w->header.info; - load_load_barrier(); + const StgInfoTable *winfo = ACQUIRE_LOAD(&w->header.info); if (winfo != &stg_DEAD_WEAK_info) { runCFinalizers((StgCFinalizerList *)w->cfinalizers); } @@ -93,10 +92,10 @@ scheduleFinalizers(Capability *cap, StgWeak *list) StgWord size; uint32_t n, i; - // This assertion does not hold with non-moving collection because - // non-moving collector does not wait for the list to be consumed (by - // doIdleGcWork()) before appending the list with more finalizers. - ASSERT(RtsFlags.GcFlags.useNonmoving || n_finalizers == 0); + // n_finalizers is not necessarily zero under non-moving collection + // because non-moving collector does not wait for the list to be consumed + // (by doIdleGcWork()) before appending the list with more finalizers. + ASSERT(RtsFlags.GcFlags.useNonmoving || SEQ_CST_LOAD(&n_finalizers) == 0); // Append finalizer_list with the new list. TODO: Perhaps cache tail of the // list for faster append. NOTE: We can't append `list` here! Otherwise we @@ -105,7 +104,7 @@ scheduleFinalizers(Capability *cap, StgWeak *list) while (*tl) { tl = &(*tl)->link; } - *tl = list; + SEQ_CST_STORE(tl, list); // Traverse the list and // * count the number of Haskell finalizers @@ -140,7 +139,7 @@ scheduleFinalizers(Capability *cap, StgWeak *list) SET_HDR(w, &stg_DEAD_WEAK_info, w->header.prof.ccs); } - n_finalizers += i; + SEQ_CST_ADD(&n_finalizers, i); // No Haskell finalizers to run? if (n == 0) return; @@ -226,7 +225,7 @@ static volatile StgWord finalizer_lock = 0; // bool runSomeFinalizers(bool all) { - if (n_finalizers == 0) + if (RELAXED_LOAD(&n_finalizers) == 0) return false; if (cas(&finalizer_lock, 0, 1) != 0) { @@ -252,17 +251,15 @@ bool runSomeFinalizers(bool all) if (!all && count >= finalizer_chunk) break; } - finalizer_list = w; - n_finalizers -= count; + RELAXED_STORE(&finalizer_list, w); + SEQ_CST_ADD(&n_finalizers, -count); if (task != NULL) { task->running_finalizers = false; } debugTrace(DEBUG_sched, "ran %d C finalizers", count); - - write_barrier(); - finalizer_lock = 0; - - return n_finalizers != 0; + bool ret = n_finalizers != 0; + RELEASE_STORE(&finalizer_lock, 0); + return ret; } diff --git a/rts/posix/GetTime.c b/rts/posix/GetTime.c index 0128e3bc8b..7d53f95401 100644 --- a/rts/posix/GetTime.c +++ b/rts/posix/GetTime.c @@ -85,7 +85,9 @@ Time getCurrentThreadCPUTime(void) defined(CLOCK_PROCESS_CPUTIME_ID) && \ defined(HAVE_SYSCONF) static bool have_checked_usability = false; - if (!have_checked_usability) { + // The RELAXED operation is fine here as it's okay if we do the check below + // more than once. + if (!RELAXED_LOAD(&have_checked_usability)) { // The Linux clock_getres(2) manpage claims that some early versions of // Linux will return values which are uninterpretable in the presence // of migration across CPUs. They claim that clock_getcpuclockid(0) @@ -95,7 +97,7 @@ Time getCurrentThreadCPUTime(void) sysErrorBelch("getCurrentThreadCPUTime: no supported"); stg_exit(EXIT_FAILURE); } - have_checked_usability = true; + RELAXED_STORE(&have_checked_usability, true); } return getClockTime(CLOCK_THREAD_CPUTIME_ID); #else diff --git a/rts/posix/OSThreads.c b/rts/posix/OSThreads.c index c51ccfcafb..6347e8ce7a 100644 --- a/rts/posix/OSThreads.c +++ b/rts/posix/OSThreads.c @@ -398,6 +398,14 @@ interruptOSThread (OSThreadId id) pthread_kill(id, SIGPIPE); } +void +joinOSThread (OSThreadId id) +{ + if (pthread_join(id, NULL) != 0) { + sysErrorBelch("joinOSThread: error %d", errno); + } +} + KernelThreadId kernelThreadId (void) { #if defined(linux_HOST_OS) diff --git a/rts/posix/Signals.c b/rts/posix/Signals.c index 2e534042f3..5ad688bc2f 100644 --- a/rts/posix/Signals.c +++ b/rts/posix/Signals.c @@ -128,7 +128,7 @@ more_handlers(int sig) } // Here's the pipe into which we will send our signals -static volatile int io_manager_wakeup_fd = -1; +static int io_manager_wakeup_fd = -1; static int timer_manager_control_wr_fd = -1; #define IO_MANAGER_WAKEUP 0xff @@ -136,7 +136,7 @@ static int timer_manager_control_wr_fd = -1; #define IO_MANAGER_SYNC 0xfd void setTimerManagerControlFd(int fd) { - timer_manager_control_wr_fd = fd; + RELAXED_STORE(&timer_manager_control_wr_fd, fd); } void @@ -144,7 +144,7 @@ setIOManagerWakeupFd (int fd) { // only called when THREADED_RTS, but unconditionally // compiled here because GHC.Event.Control depends on it. - io_manager_wakeup_fd = fd; + SEQ_CST_STORE(&io_manager_wakeup_fd, fd); } /* ----------------------------------------------------------------------------- @@ -154,14 +154,15 @@ void ioManagerWakeup (void) { int r; + const int wakeup_fd = SEQ_CST_LOAD(&io_manager_wakeup_fd); // Wake up the IO Manager thread by sending a byte down its pipe - if (io_manager_wakeup_fd >= 0) { + if (wakeup_fd >= 0) { #if defined(HAVE_EVENTFD) StgWord64 n = (StgWord64)IO_MANAGER_WAKEUP; - r = write(io_manager_wakeup_fd, (char *) &n, 8); + r = write(wakeup_fd, (char *) &n, 8); #else StgWord8 byte = (StgWord8)IO_MANAGER_WAKEUP; - r = write(io_manager_wakeup_fd, &byte, 1); + r = write(wakeup_fd, &byte, 1); #endif /* N.B. If the TimerManager is shutting down as we run this * then there is a possibility that our first read of @@ -174,7 +175,7 @@ ioManagerWakeup (void) * Since this is not an error condition, we do not print the error * message in this case. */ - if (r == -1 && io_manager_wakeup_fd >= 0) { + if (r == -1 && SEQ_CST_LOAD(&io_manager_wakeup_fd) >= 0) { sysErrorBelch("ioManagerWakeup: write"); } } @@ -186,21 +187,27 @@ ioManagerDie (void) { StgWord8 byte = (StgWord8)IO_MANAGER_DIE; uint32_t i; - int fd; int r; - if (0 <= timer_manager_control_wr_fd) { - r = write(timer_manager_control_wr_fd, &byte, 1); - if (r == -1) { sysErrorBelch("ioManagerDie: write"); } - timer_manager_control_wr_fd = -1; - } - - for (i=0; i < n_capabilities; i++) { - fd = capabilities[i]->io_manager_control_wr_fd; + { + // Shut down timer manager + const int fd = RELAXED_LOAD(&timer_manager_control_wr_fd); if (0 <= fd) { r = write(fd, &byte, 1); if (r == -1) { sysErrorBelch("ioManagerDie: write"); } - capabilities[i]->io_manager_control_wr_fd = -1; + RELAXED_STORE(&timer_manager_control_wr_fd, -1); + } + } + + { + // Shut down IO managers + for (i=0; i < n_capabilities; i++) { + const int fd = RELAXED_LOAD(&capabilities[i]->io_manager_control_wr_fd); + if (0 <= fd) { + r = write(fd, &byte, 1); + if (r == -1) { sysErrorBelch("ioManagerDie: write"); } + RELAXED_STORE(&capabilities[i]->io_manager_control_wr_fd, -1); + } } } } @@ -216,7 +223,7 @@ ioManagerStart (void) { // Make sure the IO manager thread is running Capability *cap; - if (timer_manager_control_wr_fd < 0 || io_manager_wakeup_fd < 0) { + if (SEQ_CST_LOAD(&timer_manager_control_wr_fd) < 0 || SEQ_CST_LOAD(&io_manager_wakeup_fd) < 0) { cap = rts_lock(); ioManagerStartCap(&cap); rts_unlock(cap); @@ -258,9 +265,10 @@ generic_handler(int sig USED_IF_THREADS, memcpy(buf+1, info, sizeof(siginfo_t)); } - if (0 <= timer_manager_control_wr_fd) + int timer_control_fd = RELAXED_LOAD(&timer_manager_control_wr_fd); + if (0 <= timer_control_fd) { - r = write(timer_manager_control_wr_fd, buf, sizeof(siginfo_t)+1); + r = write(timer_control_fd, buf, sizeof(siginfo_t)+1); if (r == -1 && errno == EAGAIN) { errorBelch("lost signal due to full pipe: %d\n", sig); } diff --git a/rts/posix/itimer/Pthread.c b/rts/posix/itimer/Pthread.c index dd36137b72..82379b9172 100644 --- a/rts/posix/itimer/Pthread.c +++ b/rts/posix/itimer/Pthread.c @@ -85,11 +85,11 @@ static Time itimer_interval = DEFAULT_TICK_INTERVAL; // Should we be firing ticks? // Writers to this must hold the mutex below. -static volatile bool stopped = false; +static bool stopped = false; // should the ticker thread exit? // This can be set without holding the mutex. -static volatile bool exited = true; +static bool exited = true; // Signaled when we want to (re)start the timer static Condition start_cond; @@ -120,7 +120,9 @@ static void *itimer_thread_func(void *_handle_tick) } #endif - while (!exited) { + // Relaxed is sufficient: If we don't see that exited was set in one iteration we will + // see it next time. + while (!RELAXED_LOAD(&exited)) { if (USE_TIMERFD_FOR_ITIMER) { ssize_t r = read(timerfd, &nticks, sizeof(nticks)); if ((r == 0) && (errno == 0)) { @@ -142,7 +144,8 @@ static void *itimer_thread_func(void *_handle_tick) } // first try a cheap test - if (stopped) { + TSAN_ANNOTATE_BENIGN_RACE(&stopped, "itimer_thread_func"); + if (RELAXED_LOAD(&stopped)) { OS_ACQUIRE_LOCK(&mutex); // should we really stop? if (stopped) { @@ -186,7 +189,7 @@ void startTicker(void) { OS_ACQUIRE_LOCK(&mutex); - stopped = 0; + RELAXED_STORE(&stopped, false); signalCondition(&start_cond); OS_RELEASE_LOCK(&mutex); } @@ -196,7 +199,7 @@ void stopTicker(void) { OS_ACQUIRE_LOCK(&mutex); - stopped = 1; + RELAXED_STORE(&stopped, true); OS_RELEASE_LOCK(&mutex); } @@ -204,8 +207,8 @@ stopTicker(void) void exitTicker (bool wait) { - ASSERT(!exited); - exited = true; + ASSERT(!SEQ_CST_LOAD(&exited)); + SEQ_CST_STORE(&exited, true); // ensure that ticker wakes up if stopped startTicker(); diff --git a/rts/rts.cabal.in b/rts/rts.cabal.in index 08ebd3d7bf..12a4d68e4a 100644 --- a/rts/rts.cabal.in +++ b/rts/rts.cabal.in @@ -462,6 +462,7 @@ library STM.c Schedule.c Sparks.c + SpinLock.c StableName.c StablePtr.c StaticPtrTable.c diff --git a/rts/sm/BlockAlloc.c b/rts/sm/BlockAlloc.c index 2bf497197e..451c182ac3 100644 --- a/rts/sm/BlockAlloc.c +++ b/rts/sm/BlockAlloc.c @@ -787,6 +787,26 @@ free_mega_group (bdescr *mg) } +/* Note [Data races in freeGroup] + * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ + * freeGroup commits a rather serious concurrency sin in its block coalescence + * logic: When freeing a block it looks at bd->free of the previous/next block + * to see whether it is allocated. However, the free'ing thread likely does not + * own the previous/next block, nor do we make any attempt to synchronize with + * the thread that *does* own it; this makes this access a data race. + * + * The original design argued that this was correct because `bd->free` will + * only take a value of -1 when the block is free and thereby owned by the + * storage manager. However, this is nevertheless unsafe under the C11 data + * model, which guarantees no particular semantics for data races. + * + * We currently assume (and hope) we won't see torn values and consequently + * we will never see `bd->free == -1` for an allocated block which we do not + * own. However, this is all extremely dodgy. + * + * This is tracked as #18913. + */ + void freeGroup(bdescr *p) { @@ -796,7 +816,7 @@ freeGroup(bdescr *p) // not true in multithreaded GC: // ASSERT_SM_LOCK(); - ASSERT(p->free != (P_)-1); + ASSERT(RELAXED_LOAD(&p->free) != (P_)-1); #if defined(DEBUG) for (uint32_t i=0; i < p->blocks; i++) { @@ -806,9 +826,9 @@ freeGroup(bdescr *p) node = p->node; - p->free = (void *)-1; /* indicates that this block is free */ - p->gen = NULL; - p->gen_no = 0; + RELAXED_STORE(&p->free, (void *) -1); /* indicates that this block is free */ + RELAXED_STORE(&p->gen, NULL); + RELAXED_STORE(&p->gen_no, 0); /* fill the block group with garbage if sanity checking is on */ IF_DEBUG(zero_on_gc, memset(p->start, 0xaa, (W_)p->blocks * BLOCK_SIZE)); @@ -834,7 +854,11 @@ freeGroup(bdescr *p) { bdescr *next; next = p + p->blocks; - if (next <= LAST_BDESCR(MBLOCK_ROUND_DOWN(p)) && next->free == (P_)-1) + + // See Note [Data races in freeGroup]. + TSAN_ANNOTATE_BENIGN_RACE(&next->free, "freeGroup"); + if (next <= LAST_BDESCR(MBLOCK_ROUND_DOWN(p)) + && RELAXED_LOAD(&next->free) == (P_)-1) { p->blocks += next->blocks; ln = log_2(next->blocks); @@ -855,7 +879,9 @@ freeGroup(bdescr *p) prev = p - 1; if (prev->blocks == 0) prev = prev->link; // find the head - if (prev->free == (P_)-1) + // See Note [Data races in freeGroup]. + TSAN_ANNOTATE_BENIGN_RACE(&prev->free, "freeGroup"); + if (RELAXED_LOAD(&prev->free) == (P_)-1) { ln = log_2(prev->blocks); dbl_link_remove(prev, &free_list[node][ln]); diff --git a/rts/sm/Evac.c b/rts/sm/Evac.c index 0ece06016a..b324a59179 100644 --- a/rts/sm/Evac.c +++ b/rts/sm/Evac.c @@ -171,7 +171,11 @@ copy_tag(StgClosure **p, const StgInfoTable *info, #endif return evacuate(p); // does the failed_to_evac stuff } else { - *p = TAG_CLOSURE(tag,(StgClosure*)to); + // This doesn't need to have RELEASE ordering since we are guaranteed + // to scavenge the to-space object on the current core therefore + // no-one else will follow this pointer (FIXME: Is this true in + // light of the selector optimization?). + RELEASE_STORE(p, TAG_CLOSURE(tag,(StgClosure*)to)); } } #else @@ -206,9 +210,9 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info, // if somebody else reads the forwarding pointer, we better make // sure there's a closure at the end of it. - write_barrier(); - *p = TAG_CLOSURE(tag,(StgClosure*)to); - src->header.info = (const StgInfoTable *)MK_FORWARDING_PTR(to); + RELEASE_STORE(p, TAG_CLOSURE(tag,(StgClosure*)to)); + RELEASE_STORE(&src->header.info, \ + (const StgInfoTable *)MK_FORWARDING_PTR(to)); // if (to+size+2 < bd->start + BLOCK_SIZE_W) { // __builtin_prefetch(to + size + 2, 1); @@ -245,7 +249,7 @@ spin: goto spin; } if (IS_FORWARDING_PTR(info)) { - src->header.info = (const StgInfoTable *)info; + RELEASE_STORE(&src->header.info, (const StgInfoTable *)info); evacuate(p); // does the failed_to_evac stuff return false; } @@ -261,9 +265,8 @@ spin: to[i] = from[i]; } - write_barrier(); - *p = (StgClosure *)to; - src->header.info = (const StgInfoTable*)MK_FORWARDING_PTR(to); + RELEASE_STORE(p, (StgClosure *) to); + RELEASE_STORE(&src->header.info, (const StgInfoTable*)MK_FORWARDING_PTR(to)); #if defined(PROFILING) // We store the size of the just evacuated object in the LDV word so that @@ -306,12 +309,12 @@ evacuate_large(StgPtr p) gen_workspace *ws; bd = Bdescr(p); - gen = bd->gen; - gen_no = bd->gen_no; + gen = RELAXED_LOAD(&bd->gen); + gen_no = RELAXED_LOAD(&bd->gen_no); ACQUIRE_SPIN_LOCK(&gen->sync); // already evacuated? - if (bd->flags & BF_EVACUATED) { + if (RELAXED_LOAD(&bd->flags) & BF_EVACUATED) { /* Don't forget to set the gct->failed_to_evac flag if we didn't get * the desired destination (see comments in evacuate()). */ @@ -344,9 +347,9 @@ evacuate_large(StgPtr p) ws = &gct->gens[new_gen_no]; new_gen = &generations[new_gen_no]; - bd->flags |= BF_EVACUATED; + __atomic_fetch_or(&bd->flags, BF_EVACUATED, __ATOMIC_ACQ_REL); if (RTS_UNLIKELY(RtsFlags.GcFlags.useNonmoving && new_gen == oldest_gen)) { - bd->flags |= BF_NONMOVING; + __atomic_fetch_or(&bd->flags, BF_NONMOVING, __ATOMIC_ACQ_REL); } initBdescr(bd, new_gen, new_gen->to); @@ -354,7 +357,7 @@ evacuate_large(StgPtr p) // these objects, because they aren't allowed to contain any outgoing // pointers. For these blocks, we skip the scavenge stage and put // them straight on the scavenged_large_objects list. - if (bd->flags & BF_PINNED) { + if (RELAXED_LOAD(&bd->flags) & BF_PINNED) { ASSERT(get_itbl((StgClosure *)p)->type == ARR_WORDS); if (new_gen != gen) { ACQUIRE_SPIN_LOCK(&new_gen->sync); } @@ -389,7 +392,7 @@ evacuate_static_object (StgClosure **link_field, StgClosure *q) return; } - StgWord link = (StgWord)*link_field; + StgWord link = RELAXED_LOAD((StgWord*) link_field); // See Note [STATIC_LINK fields] for how the link field bits work if (((link & STATIC_BITS) | prev_static_flag) != 3) { @@ -435,7 +438,7 @@ evacuate_compact (StgPtr p) bd = Bdescr((StgPtr)str); gen_no = bd->gen_no; - if (bd->flags & BF_NONMOVING) { + if (RELAXED_LOAD(&bd->flags) & BF_NONMOVING) { // We may have evacuated the block to the nonmoving generation. If so // we need to make sure it is added to the mark queue since the only // reference to it may be from the moving heap. @@ -500,7 +503,7 @@ evacuate_compact (StgPtr p) // in the GC, and that should never see blocks other than the first) bd->flags |= BF_EVACUATED; if (RTS_UNLIKELY(RtsFlags.GcFlags.useNonmoving && new_gen == oldest_gen)) { - bd->flags |= BF_NONMOVING; + __atomic_fetch_or(&bd->flags, BF_NONMOVING, __ATOMIC_RELAXED); } initBdescr(bd, new_gen, new_gen->to); @@ -581,7 +584,7 @@ evacuate(StgClosure **p) const StgInfoTable *info; StgWord tag; - q = *p; + q = RELAXED_LOAD(p); loop: /* The tag and the pointer are split, to be merged after evacing */ @@ -638,10 +641,11 @@ loop: bd = Bdescr((P_)q); - if ((bd->flags & (BF_LARGE | BF_MARKED | BF_EVACUATED | BF_COMPACT | BF_NONMOVING)) != 0) { + uint16_t flags = RELAXED_LOAD(&bd->flags); + if ((flags & (BF_LARGE | BF_MARKED | BF_EVACUATED | BF_COMPACT | BF_NONMOVING)) != 0) { // Pointer to non-moving heap. Non-moving heap is collected using // mark-sweep so this object should be marked and then retained in sweep. - if (RTS_UNLIKELY(bd->flags & BF_NONMOVING)) { + if (RTS_UNLIKELY(RELAXED_LOAD(&bd->flags) & BF_NONMOVING)) { // NOTE: large objects in nonmoving heap are also marked with // BF_NONMOVING. Those are moved to scavenged_large_objects list in // mark phase. @@ -656,11 +660,11 @@ loop: // happen often, but allowing it makes certain things a bit // easier; e.g. scavenging an object is idempotent, so it's OK to // have an object on the mutable list multiple times. - if (bd->flags & BF_EVACUATED) { + if (flags & BF_EVACUATED) { // We aren't copying this object, so we have to check // whether it is already in the target generation. (this is // the write barrier). - if (bd->gen_no < gct->evac_gen_no) { + if (RELAXED_LOAD(&bd->gen_no) < gct->evac_gen_no) { gct->failed_to_evac = true; TICK_GC_FAILED_PROMOTION(); } @@ -671,20 +675,20 @@ loop: // right thing for objects that are half way in the middle of the first // block of a compact (and would be treated as large objects even though // they are not) - if (bd->flags & BF_COMPACT) { + if (flags & BF_COMPACT) { evacuate_compact((P_)q); return; } /* evacuate large objects by re-linking them onto a different list. */ - if (bd->flags & BF_LARGE) { + if (flags & BF_LARGE) { evacuate_large((P_)q); // We may have evacuated the block to the nonmoving generation. If so // we need to make sure it is added to the mark queue since the only // reference to it may be from the moving heap. - if (major_gc && bd->flags & BF_NONMOVING && !deadlock_detect_gc) { + if (major_gc && flags & BF_NONMOVING && !deadlock_detect_gc) { markQueuePushClosureGC(&gct->cap->upd_rem_set.queue, q); } return; @@ -702,7 +706,7 @@ loop: gen_no = bd->dest_no; - info = q->header.info; + info = ACQUIRE_LOAD(&q->header.info); if (IS_FORWARDING_PTR(info)) { /* Already evacuated, just return the forwarding address. @@ -722,9 +726,12 @@ loop: * check if gen is too low. */ StgClosure *e = (StgClosure*)UN_FORWARDING_PTR(info); - *p = TAG_CLOSURE(tag,e); + RELAXED_STORE(p, TAG_CLOSURE(tag,e)); if (gen_no < gct->evac_gen_no) { // optimisation - if (Bdescr((P_)e)->gen_no < gct->evac_gen_no) { + // The ACQUIRE here is necessary to ensure that we see gen_no if the + // evacuted object lives in a block newly-allocated by a GC thread on + // another core. + if (ACQUIRE_LOAD(&Bdescr((P_)e)->gen_no) < gct->evac_gen_no) { gct->failed_to_evac = true; TICK_GC_FAILED_PROMOTION(); } @@ -752,15 +759,17 @@ loop: if (info == Czh_con_info && // unsigned, so always true: (StgChar)w >= MIN_CHARLIKE && (StgChar)w <= MAX_CHARLIKE) { - *p = TAG_CLOSURE(tag, - (StgClosure *)CHARLIKE_CLOSURE((StgChar)w) - ); + RELAXED_STORE(p, \ + TAG_CLOSURE(tag, \ + (StgClosure *)CHARLIKE_CLOSURE((StgChar)w) + )); } else if (info == Izh_con_info && (StgInt)w >= MIN_INTLIKE && (StgInt)w <= MAX_INTLIKE) { - *p = TAG_CLOSURE(tag, - (StgClosure *)INTLIKE_CLOSURE((StgInt)w) - ); + RELAXED_STORE(p, \ + TAG_CLOSURE(tag, \ + (StgClosure *)INTLIKE_CLOSURE((StgInt)w) + )); } else { copy_tag_nolock(p,info,q,sizeofW(StgHeader)+1,gen_no,tag); @@ -814,10 +823,10 @@ loop: const StgInfoTable *i; r = ((StgInd*)q)->indirectee; if (GET_CLOSURE_TAG(r) == 0) { - i = r->header.info; + i = ACQUIRE_LOAD(&r->header.info); if (IS_FORWARDING_PTR(i)) { r = (StgClosure *)UN_FORWARDING_PTR(i); - i = r->header.info; + i = ACQUIRE_LOAD(&r->header.info); } if (i == &stg_TSO_info || i == &stg_WHITEHOLE_info @@ -842,7 +851,7 @@ loop: ASSERT(i != &stg_IND_info); } q = r; - *p = r; + RELEASE_STORE(p, r); goto loop; } @@ -868,8 +877,8 @@ loop: case IND: // follow chains of indirections, don't evacuate them - q = ((StgInd*)q)->indirectee; - *p = q; + q = RELAXED_LOAD(&((StgInd*)q)->indirectee); + RELAXED_STORE(p, q); goto loop; case RET_BCO: @@ -983,11 +992,12 @@ evacuate_BLACKHOLE(StgClosure **p) ASSERT(GET_CLOSURE_TAG(q) == 0); bd = Bdescr((P_)q); + const uint16_t flags = RELAXED_LOAD(&bd->flags); // blackholes can't be in a compact - ASSERT((bd->flags & BF_COMPACT) == 0); + ASSERT((flags & BF_COMPACT) == 0); - if (RTS_UNLIKELY(bd->flags & BF_NONMOVING)) { + if (RTS_UNLIKELY(RELAXED_LOAD(&bd->flags) & BF_NONMOVING)) { if (major_gc && !deadlock_detect_gc) markQueuePushClosureGC(&gct->cap->upd_rem_set.queue, q); return; @@ -996,18 +1006,18 @@ evacuate_BLACKHOLE(StgClosure **p) // blackholes *can* be in a large object: when raiseAsync() creates an // AP_STACK the payload might be large enough to create a large object. // See #14497. - if (bd->flags & BF_LARGE) { + if (flags & BF_LARGE) { evacuate_large((P_)q); return; } - if (bd->flags & BF_EVACUATED) { + if (flags & BF_EVACUATED) { if (bd->gen_no < gct->evac_gen_no) { gct->failed_to_evac = true; TICK_GC_FAILED_PROMOTION(); } return; } - if (bd->flags & BF_MARKED) { + if (flags & BF_MARKED) { if (!is_marked((P_)q,bd)) { mark((P_)q,bd); push_mark_stack((P_)q); @@ -1015,13 +1025,13 @@ evacuate_BLACKHOLE(StgClosure **p) return; } gen_no = bd->dest_no; - info = q->header.info; + info = ACQUIRE_LOAD(&q->header.info); if (IS_FORWARDING_PTR(info)) { StgClosure *e = (StgClosure*)UN_FORWARDING_PTR(info); *p = e; if (gen_no < gct->evac_gen_no) { // optimisation - if (Bdescr((P_)e)->gen_no < gct->evac_gen_no) { + if (ACQUIRE_LOAD(&Bdescr((P_)e)->gen_no) < gct->evac_gen_no) { gct->failed_to_evac = true; TICK_GC_FAILED_PROMOTION(); } @@ -1090,13 +1100,11 @@ unchain_thunk_selectors(StgSelector *p, StgClosure *val) // XXX we do not have BLACKHOLEs any more; replace with // a THUNK_SELECTOR again. This will go into a loop if it is // entered, and should result in a NonTermination exception. - ((StgThunk *)p)->payload[0] = val; - write_barrier(); - SET_INFO((StgClosure *)p, &stg_sel_0_upd_info); + RELAXED_STORE(&((StgThunk *)p)->payload[0], val); + SET_INFO_RELEASE((StgClosure *)p, &stg_sel_0_upd_info); } else { - ((StgInd *)p)->indirectee = val; - write_barrier(); - SET_INFO((StgClosure *)p, &stg_IND_info); + RELAXED_STORE(&((StgInd *)p)->indirectee, val); + SET_INFO_RELEASE((StgClosure *)p, &stg_IND_info); } // For the purposes of LDV profiling, we have created an @@ -1143,7 +1151,7 @@ selector_chain: // save any space in any case, and updating with an indirection is // trickier in a non-collected gen: we would have to update the // mutable list. - if (bd->flags & (BF_EVACUATED | BF_NONMOVING)) { + if (RELAXED_LOAD(&bd->flags) & (BF_EVACUATED | BF_NONMOVING)) { unchain_thunk_selectors(prev_thunk_selector, (StgClosure *)p); *q = (StgClosure *)p; // shortcut, behave as for: if (evac) evacuate(q); @@ -1198,8 +1206,7 @@ selector_chain: // need the write-barrier stuff. // - undo the chain we've built to point to p. SET_INFO((StgClosure *)p, (const StgInfoTable *)info_ptr); - write_barrier(); - *q = (StgClosure *)p; + RELEASE_STORE(q, (StgClosure *) p); if (evac) evacuate(q); unchain_thunk_selectors(prev_thunk_selector, (StgClosure *)p); return; @@ -1225,7 +1232,7 @@ selector_loop: // from-space during marking, for example. We rely on the property // that evacuate() doesn't mind if it gets passed a to-space pointer. - info = (StgInfoTable*)selectee->header.info; + info = RELAXED_LOAD((StgInfoTable**) &selectee->header.info); if (IS_FORWARDING_PTR(info)) { // We don't follow pointers into to-space; the constructor @@ -1252,7 +1259,7 @@ selector_loop: info->layout.payload.nptrs)); // Select the right field from the constructor - StgClosure *val = selectee->payload[field]; + StgClosure *val = RELAXED_LOAD(&selectee->payload[field]); #if defined(PROFILING) // For the purposes of LDV profiling, we have destroyed @@ -1278,19 +1285,19 @@ selector_loop: // evaluating until we find the real value, and then // update the whole chain to point to the value. val_loop: - info_ptr = (StgWord)UNTAG_CLOSURE(val)->header.info; + info_ptr = ACQUIRE_LOAD((StgWord*) &UNTAG_CLOSURE(val)->header.info); if (!IS_FORWARDING_PTR(info_ptr)) { info = INFO_PTR_TO_STRUCT((StgInfoTable *)info_ptr); switch (info->type) { case IND: case IND_STATIC: - val = ((StgInd *)val)->indirectee; + val = RELAXED_LOAD(&((StgInd *)val)->indirectee); goto val_loop; case THUNK_SELECTOR: // Use payload to make a list of thunk selectors, to be // used in unchain_thunk_selectors - ((StgClosure*)p)->payload[0] = (StgClosure *)prev_thunk_selector; + RELAXED_STORE(&((StgClosure*)p)->payload[0], (StgClosure *)prev_thunk_selector); prev_thunk_selector = p; p = (StgSelector*)val; goto selector_chain; @@ -1298,7 +1305,7 @@ selector_loop: break; } } - ((StgClosure*)p)->payload[0] = (StgClosure *)prev_thunk_selector; + RELAXED_STORE(&((StgClosure*)p)->payload[0], (StgClosure *)prev_thunk_selector); prev_thunk_selector = p; *q = val; @@ -1320,22 +1327,22 @@ selector_loop: case IND: case IND_STATIC: // Again, we might need to untag a constructor. - selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee ); + selectee = UNTAG_CLOSURE( RELAXED_LOAD(&((StgInd *)selectee)->indirectee) ); goto selector_loop; case BLACKHOLE: { StgClosure *r; const StgInfoTable *i; - r = ((StgInd*)selectee)->indirectee; + r = ACQUIRE_LOAD(&((StgInd*)selectee)->indirectee); // establish whether this BH has been updated, and is now an // indirection, as in evacuate(). if (GET_CLOSURE_TAG(r) == 0) { - i = r->header.info; + i = ACQUIRE_LOAD(&r->header.info); if (IS_FORWARDING_PTR(i)) { r = (StgClosure *)UN_FORWARDING_PTR(i); - i = r->header.info; + i = RELAXED_LOAD(&r->header.info); } if (i == &stg_TSO_info || i == &stg_WHITEHOLE_info @@ -1346,7 +1353,7 @@ selector_loop: ASSERT(i != &stg_IND_info); } - selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee ); + selectee = UNTAG_CLOSURE( RELAXED_LOAD(&((StgInd *)selectee)->indirectee) ); goto selector_loop; } diff --git a/rts/sm/GC.c b/rts/sm/GC.c index 0fa927f2ad..8a8acb1b53 100644 --- a/rts/sm/GC.c +++ b/rts/sm/GC.c @@ -112,14 +112,8 @@ static W_ g0_pcnt_kept = 30; // percentage of g0 live at last minor GC /* Mut-list stats */ #if defined(DEBUG) -uint32_t mutlist_MUTVARS, - mutlist_MUTARRS, - mutlist_MVARS, - mutlist_TVAR, - mutlist_TVAR_WATCH_QUEUE, - mutlist_TREC_CHUNK, - mutlist_TREC_HEADER, - mutlist_OTHERS; +// For lack of a better option we protect mutlist_scav_stats with oldest_gen->sync +MutListScavStats mutlist_scav_stats; #endif /* Thread-local data for each GC thread @@ -184,6 +178,36 @@ bdescr *mark_stack_top_bd; // topmost block in the mark stack bdescr *mark_stack_bd; // current block in the mark stack StgPtr mark_sp; // pointer to the next unallocated mark stack entry + +/* ----------------------------------------------------------------------------- + Statistics from mut_list scavenging + -------------------------------------------------------------------------- */ + +#if defined(DEBUG) +void +zeroMutListScavStats(MutListScavStats *src) +{ + memset(src, 0, sizeof(MutListScavStats)); +} + +void +addMutListScavStats(const MutListScavStats *src, + MutListScavStats *dest) +{ +#define ADD_STATS(field) dest->field += src->field; + ADD_STATS(n_MUTVAR); + ADD_STATS(n_MUTARR); + ADD_STATS(n_MVAR); + ADD_STATS(n_TVAR); + ADD_STATS(n_TREC_CHUNK); + ADD_STATS(n_TVAR_WATCH_QUEUE); + ADD_STATS(n_TREC_HEADER); + ADD_STATS(n_OTHERS); +#undef ADD_STATS +} +#endif /* DEBUG */ + + /* ----------------------------------------------------------------------------- GarbageCollect: the main entry point to the garbage collector. @@ -250,14 +274,7 @@ GarbageCollect (uint32_t collect_gen, stablePtrLock(); #if defined(DEBUG) - mutlist_MUTVARS = 0; - mutlist_MUTARRS = 0; - mutlist_MVARS = 0; - mutlist_TVAR = 0; - mutlist_TVAR_WATCH_QUEUE = 0; - mutlist_TREC_CHUNK = 0; - mutlist_TREC_HEADER = 0; - mutlist_OTHERS = 0; + zeroMutListScavStats(&mutlist_scav_stats); #endif // attribute any costs to CCS_GC @@ -520,37 +537,37 @@ GarbageCollect (uint32_t collect_gen, const gc_thread* thread; for (i=0; i < n_gc_threads; i++) { - copied += gc_threads[i]->copied; + copied += RELAXED_LOAD(&gc_threads[i]->copied); } for (i=0; i < n_gc_threads; i++) { thread = gc_threads[i]; if (n_gc_threads > 1) { debugTrace(DEBUG_gc,"thread %d:", i); debugTrace(DEBUG_gc," copied %ld", - thread->copied * sizeof(W_)); + RELAXED_LOAD(&thread->copied) * sizeof(W_)); debugTrace(DEBUG_gc," scanned %ld", - thread->scanned * sizeof(W_)); + RELAXED_LOAD(&thread->scanned) * sizeof(W_)); debugTrace(DEBUG_gc," any_work %ld", - thread->any_work); + RELAXED_LOAD(&thread->any_work)); debugTrace(DEBUG_gc," no_work %ld", - thread->no_work); + RELAXED_LOAD(&thread->no_work)); debugTrace(DEBUG_gc," scav_find_work %ld", - thread->scav_find_work); + RELAXED_LOAD(&thread->scav_find_work)); #if defined(THREADED_RTS) && defined(PROF_SPIN) - gc_spin_spin += thread->gc_spin.spin; - gc_spin_yield += thread->gc_spin.yield; - mut_spin_spin += thread->mut_spin.spin; - mut_spin_yield += thread->mut_spin.yield; + gc_spin_spin += RELAXED_LOAD(&thread->gc_spin.spin); + gc_spin_yield += RELAXED_LOAD(&thread->gc_spin.yield); + mut_spin_spin += RELAXED_LOAD(&thread->mut_spin.spin); + mut_spin_yield += RELAXED_LOAD(&thread->mut_spin.yield); #endif - any_work += thread->any_work; - no_work += thread->no_work; - scav_find_work += thread->scav_find_work; + any_work += RELAXED_LOAD(&thread->any_work); + no_work += RELAXED_LOAD(&thread->no_work); + scav_find_work += RELAXED_LOAD(&thread->scav_find_work); - par_max_copied = stg_max(gc_threads[i]->copied, par_max_copied); + par_max_copied = stg_max(RELAXED_LOAD(&thread->copied), par_max_copied); par_balanced_copied_acc += - stg_min(n_gc_threads * gc_threads[i]->copied, copied); + stg_min(n_gc_threads * RELAXED_LOAD(&thread->copied), copied); } } if (n_gc_threads > 1) { @@ -590,10 +607,14 @@ GarbageCollect (uint32_t collect_gen, debugTrace(DEBUG_gc, "mut_list_size: %lu (%d vars, %d arrays, %d MVARs, %d TVARs, %d TVAR_WATCH_QUEUEs, %d TREC_CHUNKs, %d TREC_HEADERs, %d others)", (unsigned long)(mut_list_size * sizeof(W_)), - mutlist_MUTVARS, mutlist_MUTARRS, mutlist_MVARS, - mutlist_TVAR, mutlist_TVAR_WATCH_QUEUE, - mutlist_TREC_CHUNK, mutlist_TREC_HEADER, - mutlist_OTHERS); + mutlist_scav_stats.n_MUTVAR, + mutlist_scav_stats.n_MUTARR, + mutlist_scav_stats.n_MVAR, + mutlist_scav_stats.n_TVAR, + mutlist_scav_stats.n_TVAR_WATCH_QUEUE, + mutlist_scav_stats.n_TREC_CHUNK, + mutlist_scav_stats.n_TREC_HEADER, + mutlist_scav_stats.n_OTHERS); } bdescr *next, *prev; @@ -1109,7 +1130,7 @@ inc_running (void) static StgWord dec_running (void) { - ASSERT(gc_running_threads != 0); + ASSERT(RELAXED_LOAD(&gc_running_threads) != 0); return atomic_dec(&gc_running_threads); } @@ -1119,7 +1140,7 @@ any_work (void) int g; gen_workspace *ws; - gct->any_work++; + NONATOMIC_ADD(&gct->any_work, 1); write_barrier(); @@ -1152,7 +1173,7 @@ any_work (void) } #endif - gct->no_work++; + __atomic_fetch_add(&gct->no_work, 1, __ATOMIC_RELAXED); #if defined(THREADED_RTS) yieldThread(); #endif @@ -1193,7 +1214,7 @@ loop: debugTrace(DEBUG_gc, "%d GC threads still running", r); - while (gc_running_threads != 0) { + while (SEQ_CST_LOAD(&gc_running_threads) != 0) { // usleep(1); if (any_work()) { inc_running(); @@ -1230,7 +1251,7 @@ gcWorkerThread (Capability *cap) // measurements more accurate on Linux, perhaps because it syncs // the CPU time across the multiple cores. Without this, CPU time // is heavily skewed towards GC rather than MUT. - gct->wakeup = GC_THREAD_STANDING_BY; + SEQ_CST_STORE(&gct->wakeup, GC_THREAD_STANDING_BY); debugTrace(DEBUG_gc, "GC thread %d standing by...", gct->thread_index); ACQUIRE_SPIN_LOCK(&gct->gc_spin); @@ -1257,10 +1278,13 @@ gcWorkerThread (Capability *cap) // Wait until we're told to continue RELEASE_SPIN_LOCK(&gct->gc_spin); - gct->wakeup = GC_THREAD_WAITING_TO_CONTINUE; debugTrace(DEBUG_gc, "GC thread %d waiting to continue...", gct->thread_index); stat_endGCWorker (cap, gct); + // This must come *after* stat_endGCWorker since it serves to + // synchronize us with the GC leader, which will later aggregate the + // GC statistics. + SEQ_CST_STORE(&gct->wakeup, GC_THREAD_WAITING_TO_CONTINUE); ACQUIRE_SPIN_LOCK(&gct->mut_spin); debugTrace(DEBUG_gc, "GC thread %d on my way...", gct->thread_index); @@ -1285,7 +1309,7 @@ waitForGcThreads (Capability *cap USED_IF_THREADS, bool idle_cap[]) while(retry) { for (i=0; i < n_threads; i++) { if (i == me || idle_cap[i]) continue; - if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) { + if (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_STANDING_BY) { prodCapability(capabilities[i], cap->running_task); } } @@ -1295,7 +1319,7 @@ waitForGcThreads (Capability *cap USED_IF_THREADS, bool idle_cap[]) if (i == me || idle_cap[i]) continue; write_barrier(); interruptCapability(capabilities[i]); - if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) { + if (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_STANDING_BY) { retry = true; } } @@ -1352,10 +1376,10 @@ wakeup_gc_threads (uint32_t me USED_IF_THREADS, if (i == me || idle_cap[i]) continue; inc_running(); debugTrace(DEBUG_gc, "waking up gc thread %d", i); - if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) + if (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_STANDING_BY) barf("wakeup_gc_threads"); - gc_threads[i]->wakeup = GC_THREAD_RUNNING; + SEQ_CST_STORE(&gc_threads[i]->wakeup, GC_THREAD_RUNNING); ACQUIRE_SPIN_LOCK(&gc_threads[i]->mut_spin); RELEASE_SPIN_LOCK(&gc_threads[i]->gc_spin); } @@ -1376,9 +1400,8 @@ shutdown_gc_threads (uint32_t me USED_IF_THREADS, for (i=0; i < n_gc_threads; i++) { if (i == me || idle_cap[i]) continue; - while (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) { + while (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_WAITING_TO_CONTINUE) { busy_wait_nop(); - write_barrier(); } } #endif @@ -1393,10 +1416,10 @@ releaseGCThreads (Capability *cap USED_IF_THREADS, bool idle_cap[]) uint32_t i; for (i=0; i < n_threads; i++) { if (i == me || idle_cap[i]) continue; - if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) + if (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_WAITING_TO_CONTINUE) barf("releaseGCThreads"); - gc_threads[i]->wakeup = GC_THREAD_INACTIVE; + SEQ_CST_STORE(&gc_threads[i]->wakeup, GC_THREAD_INACTIVE); ACQUIRE_SPIN_LOCK(&gc_threads[i]->gc_spin); RELEASE_SPIN_LOCK(&gc_threads[i]->mut_spin); } @@ -1412,7 +1435,7 @@ static void stash_mut_list (Capability *cap, uint32_t gen_no) { cap->saved_mut_lists[gen_no] = cap->mut_lists[gen_no]; - cap->mut_lists[gen_no] = allocBlockOnNode_sync(cap->node); + RELEASE_STORE(&cap->mut_lists[gen_no], allocBlockOnNode_sync(cap->node)); } /* ---------------------------------------------------------------------------- @@ -1438,9 +1461,11 @@ prepare_collected_gen (generation *gen) // mutable list always has at least one block; this means we can avoid // a check for NULL in recordMutable(). for (i = 0; i < n_capabilities; i++) { - freeChain(capabilities[i]->mut_lists[g]); - capabilities[i]->mut_lists[g] = - allocBlockOnNode(capNoToNumaNode(i)); + bdescr *old = RELAXED_LOAD(&capabilities[i]->mut_lists[g]); + freeChain(old); + + bdescr *new = allocBlockOnNode(capNoToNumaNode(i)); + RELAXED_STORE(&capabilities[i]->mut_lists[g], new); } } @@ -1654,7 +1679,7 @@ collect_pinned_object_blocks (void) bdescr *last = NULL; if (use_nonmoving && gen == oldest_gen) { // Mark objects as belonging to the nonmoving heap - for (bdescr *bd = capabilities[n]->pinned_object_blocks; bd != NULL; bd = bd->link) { + for (bdescr *bd = RELAXED_LOAD(&capabilities[n]->pinned_object_blocks); bd != NULL; bd = bd->link) { bd->flags |= BF_NONMOVING; bd->gen = oldest_gen; bd->gen_no = oldest_gen->no; @@ -1673,8 +1698,8 @@ collect_pinned_object_blocks (void) if (gen->large_objects != NULL) { gen->large_objects->u.back = last; } - gen->large_objects = capabilities[n]->pinned_object_blocks; - capabilities[n]->pinned_object_blocks = NULL; + g0->large_objects = RELAXED_LOAD(&capabilities[n]->pinned_object_blocks); + RELAXED_STORE(&capabilities[n]->pinned_object_blocks, NULL); } } } diff --git a/rts/sm/GC.h b/rts/sm/GC.h index bde006913b..c5d5f6ac81 100644 --- a/rts/sm/GC.h +++ b/rts/sm/GC.h @@ -42,20 +42,32 @@ extern StgPtr mark_sp; extern bool work_stealing; -#if defined(DEBUG) -extern uint32_t mutlist_MUTVARS, mutlist_MUTARRS, mutlist_MVARS, mutlist_OTHERS, - mutlist_TVAR, - mutlist_TVAR_WATCH_QUEUE, - mutlist_TREC_CHUNK, - mutlist_TREC_HEADER; -#endif - #if defined(PROF_SPIN) && defined(THREADED_RTS) extern volatile StgWord64 whitehole_gc_spin; extern volatile StgWord64 waitForGcThreads_spin; extern volatile StgWord64 waitForGcThreads_yield; #endif +// mutable list scavenging statistics +#if defined(DEBUG) +typedef struct { + StgWord n_MUTVAR; + StgWord n_MUTARR; + StgWord n_MVAR; + StgWord n_TVAR; + StgWord n_TREC_CHUNK; + StgWord n_TVAR_WATCH_QUEUE; + StgWord n_TREC_HEADER; + StgWord n_OTHERS; +} MutListScavStats; + +extern MutListScavStats mutlist_scav_stats; + +void zeroMutListScavStats(MutListScavStats *src); +void addMutListScavStats(const MutListScavStats *src, + MutListScavStats *dest); +#endif /* DEBUG */ + void gcWorkerThread (Capability *cap); void initGcThreads (uint32_t from, uint32_t to); void freeGcThreads (void); diff --git a/rts/sm/GCAux.c b/rts/sm/GCAux.c index 11080c1f22..55b4f99596 100644 --- a/rts/sm/GCAux.c +++ b/rts/sm/GCAux.c @@ -83,7 +83,7 @@ isAlive(StgClosure *p) return p; } - info = q->header.info; + info = RELAXED_LOAD(&q->header.info); if (IS_FORWARDING_PTR(info)) { // alive! diff --git a/rts/sm/GCUtils.c b/rts/sm/GCUtils.c index 02c26ddf5e..d58fdc48ae 100644 --- a/rts/sm/GCUtils.c +++ b/rts/sm/GCUtils.c @@ -249,8 +249,8 @@ todo_block_full (uint32_t size, gen_workspace *ws) return p; } - gct->copied += ws->todo_free - bd->free; - bd->free = ws->todo_free; + gct->copied += ws->todo_free - RELAXED_LOAD(&bd->free); + RELAXED_STORE(&bd->free, ws->todo_free); ASSERT(bd->u.scan >= bd->start && bd->u.scan <= bd->free); @@ -330,10 +330,11 @@ alloc_todo_block (gen_workspace *ws, uint32_t size) gct->free_blocks = bd->link; } } - // blocks in to-space get the BF_EVACUATED flag. - bd->flags = BF_EVACUATED; - bd->u.scan = bd->start; initBdescr(bd, ws->gen, ws->gen->to); + RELAXED_STORE(&bd->u.scan, RELAXED_LOAD(&bd->start)); + // blocks in to-space get the BF_EVACUATED flag. + // RELEASE here to ensure that bd->gen is visible to other cores. + RELEASE_STORE(&bd->flags, BF_EVACUATED); } bd->link = NULL; @@ -345,7 +346,7 @@ alloc_todo_block (gen_workspace *ws, uint32_t size) // See Note [big objects] debugTrace(DEBUG_gc, "alloc new todo block %p for gen %d", - bd->free, ws->gen->no); + RELAXED_LOAD(&bd->free), ws->gen->no); return ws->todo_free; } diff --git a/rts/sm/GCUtils.h b/rts/sm/GCUtils.h index a71d6dcb92..798a795deb 100644 --- a/rts/sm/GCUtils.h +++ b/rts/sm/GCUtils.h @@ -67,7 +67,9 @@ recordMutableGen_GC (StgClosure *p, uint32_t gen_no) bd = new_bd; gct->mut_lists[gen_no] = bd; } - *bd->free++ = (StgWord)p; + *bd->free++ = (StgWord) p; + // N.B. we are allocating into our Capability-local mut_list, therefore + // we don't need an atomic increment. } #include "EndPrivate.h" diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c index 65b1338f10..b8d120823c 100644 --- a/rts/sm/MarkWeak.c +++ b/rts/sm/MarkWeak.c @@ -414,14 +414,13 @@ markWeakPtrList ( void ) StgWeak *w, **last_w; last_w = &gen->weak_ptr_list; - for (w = gen->weak_ptr_list; w != NULL; w = w->link) { + for (w = gen->weak_ptr_list; w != NULL; w = RELAXED_LOAD(&w->link)) { // w might be WEAK, EVACUATED, or DEAD_WEAK (actually CON_STATIC) here #if defined(DEBUG) { // careful to do this assertion only reading the info ptr // once, because during parallel GC it might change under our feet. - const StgInfoTable *info; - info = w->header.info; + const StgInfoTable *info = RELAXED_LOAD(&w->header.info); ASSERT(IS_FORWARDING_PTR(info) || info == &stg_DEAD_WEAK_info || INFO_PTR_TO_STRUCT(info)->type == WEAK); diff --git a/rts/sm/NonMoving.c b/rts/sm/NonMoving.c index 388ceae2fd..3eafd6be98 100644 --- a/rts/sm/NonMoving.c +++ b/rts/sm/NonMoving.c @@ -726,6 +726,7 @@ void nonmovingStop(void) "waiting for nonmoving collector thread to terminate"); ACQUIRE_LOCK(&concurrent_coll_finished_lock); waitCondition(&concurrent_coll_finished, &concurrent_coll_finished_lock); + joinOSThread(mark_thread); } #endif } diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c index dd9a96adf8..9fe2c6006e 100644 --- a/rts/sm/Scav.c +++ b/rts/sm/Scav.c @@ -65,6 +65,8 @@ #include "sm/NonMoving.h" // for nonmoving_set_closure_mark_bit #include "sm/NonMovingScav.h" +#include <string.h> /* for memset */ + static void scavenge_large_bitmap (StgPtr p, StgLargeBitmap *large_bitmap, StgWord size ); @@ -201,9 +203,9 @@ scavenge_compact(StgCompactNFData *str) gct->eager_promotion = saved_eager; if (gct->failed_to_evac) { - ((StgClosure *)str)->header.info = &stg_COMPACT_NFDATA_DIRTY_info; + RELEASE_STORE(&((StgClosure *)str)->header.info, &stg_COMPACT_NFDATA_DIRTY_info); } else { - ((StgClosure *)str)->header.info = &stg_COMPACT_NFDATA_CLEAN_info; + RELEASE_STORE(&((StgClosure *)str)->header.info, &stg_COMPACT_NFDATA_CLEAN_info); } } @@ -464,9 +466,9 @@ scavenge_block (bdescr *bd) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - mvar->header.info = &stg_MVAR_DIRTY_info; + RELEASE_STORE(&mvar->header.info, &stg_MVAR_DIRTY_info); } else { - mvar->header.info = &stg_MVAR_CLEAN_info; + RELEASE_STORE(&mvar->header.info, &stg_MVAR_CLEAN_info); } p += sizeofW(StgMVar); break; @@ -481,9 +483,9 @@ scavenge_block (bdescr *bd) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - tvar->header.info = &stg_TVAR_DIRTY_info; + RELEASE_STORE(&tvar->header.info, &stg_TVAR_DIRTY_info); } else { - tvar->header.info = &stg_TVAR_CLEAN_info; + RELEASE_STORE(&tvar->header.info, &stg_TVAR_CLEAN_info); } p += sizeofW(StgTVar); break; @@ -615,9 +617,9 @@ scavenge_block (bdescr *bd) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_MUT_VAR_DIRTY_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_VAR_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_MUT_VAR_CLEAN_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_VAR_CLEAN_info); } p += sizeofW(StgMutVar); break; @@ -634,9 +636,9 @@ scavenge_block (bdescr *bd) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info); } else { - bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info; + RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_CLEAN_info); } p += sizeofW(StgBlockingQueue); break; @@ -686,9 +688,9 @@ scavenge_block (bdescr *bd) p = scavenge_mut_arr_ptrs((StgMutArrPtrs*)p); if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_DIRTY_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_CLEAN_info); } gct->eager_promotion = saved_eager_promotion; @@ -703,9 +705,9 @@ scavenge_block (bdescr *bd) p = scavenge_mut_arr_ptrs((StgMutArrPtrs*)p); if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info); } break; } @@ -728,9 +730,9 @@ scavenge_block (bdescr *bd) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_DIRTY_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_SMALL_MUT_ARR_PTRS_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_CLEAN_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_SMALL_MUT_ARR_PTRS_CLEAN_info); } gct->failed_to_evac = true; // always put it on the mutable list. @@ -749,9 +751,9 @@ scavenge_block (bdescr *bd) } if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info); } break; } @@ -834,7 +836,7 @@ scavenge_block (bdescr *bd) if (p > bd->free) { gct->copied += ws->todo_free - bd->free; - bd->free = p; + RELEASE_STORE(&bd->free, p); } debugTrace(DEBUG_gc, " scavenged %ld bytes", @@ -889,9 +891,9 @@ scavenge_mark_stack(void) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - mvar->header.info = &stg_MVAR_DIRTY_info; + RELEASE_STORE(&mvar->header.info, &stg_MVAR_DIRTY_info); } else { - mvar->header.info = &stg_MVAR_CLEAN_info; + RELEASE_STORE(&mvar->header.info, &stg_MVAR_CLEAN_info); } break; } @@ -905,9 +907,9 @@ scavenge_mark_stack(void) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - tvar->header.info = &stg_TVAR_DIRTY_info; + RELEASE_STORE(&tvar->header.info, &stg_TVAR_DIRTY_info); } else { - tvar->header.info = &stg_TVAR_CLEAN_info; + RELEASE_STORE(&tvar->header.info, &stg_TVAR_CLEAN_info); } break; } @@ -1011,9 +1013,9 @@ scavenge_mark_stack(void) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_MUT_VAR_DIRTY_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_VAR_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_MUT_VAR_CLEAN_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_VAR_CLEAN_info); } break; } @@ -1030,9 +1032,9 @@ scavenge_mark_stack(void) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info); } else { - bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info; + RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_CLEAN_info); } break; } @@ -1078,9 +1080,9 @@ scavenge_mark_stack(void) scavenge_mut_arr_ptrs((StgMutArrPtrs *)p); if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_DIRTY_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_CLEAN_info); } gct->eager_promotion = saved_eager_promotion; @@ -1097,9 +1099,9 @@ scavenge_mark_stack(void) scavenge_mut_arr_ptrs((StgMutArrPtrs *)p); if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info; + RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info); } break; } @@ -1124,9 +1126,9 @@ scavenge_mark_stack(void) gct->eager_promotion = saved_eager; if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_DIRTY_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_CLEAN_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_CLEAN_info); } gct->failed_to_evac = true; // mutable anyhow. @@ -1145,9 +1147,9 @@ scavenge_mark_stack(void) } if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info); } break; } @@ -1251,9 +1253,9 @@ scavenge_one(StgPtr p) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - mvar->header.info = &stg_MVAR_DIRTY_info; + RELEASE_STORE(&mvar->header.info, &stg_MVAR_DIRTY_info); } else { - mvar->header.info = &stg_MVAR_CLEAN_info; + RELEASE_STORE(&mvar->header.info, &stg_MVAR_CLEAN_info); } break; } @@ -1267,9 +1269,9 @@ scavenge_one(StgPtr p) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - tvar->header.info = &stg_TVAR_DIRTY_info; + RELEASE_STORE(&tvar->header.info, &stg_TVAR_DIRTY_info); } else { - tvar->header.info = &stg_TVAR_CLEAN_info; + RELEASE_STORE(&tvar->header.info, &stg_TVAR_CLEAN_info); } break; } @@ -1331,9 +1333,9 @@ scavenge_one(StgPtr p) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_MUT_VAR_DIRTY_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_MUT_VAR_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_MUT_VAR_CLEAN_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_MUT_VAR_CLEAN_info); } break; } @@ -1350,9 +1352,9 @@ scavenge_one(StgPtr p) gct->eager_promotion = saved_eager_promotion; if (gct->failed_to_evac) { - bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info; + RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info); } else { - bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info; + RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_CLEAN_info); } break; } @@ -1398,9 +1400,9 @@ scavenge_one(StgPtr p) scavenge_mut_arr_ptrs((StgMutArrPtrs *)p); if (gct->failed_to_evac) { - ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_DIRTY_info; + RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_DIRTY_info); } else { - ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; + RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_CLEAN_info); } gct->eager_promotion = saved_eager_promotion; @@ -1415,9 +1417,9 @@ scavenge_one(StgPtr p) scavenge_mut_arr_ptrs((StgMutArrPtrs *)p); if (gct->failed_to_evac) { - ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info; + RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info); } else { - ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info; + RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info); } break; } @@ -1442,9 +1444,9 @@ scavenge_one(StgPtr p) gct->eager_promotion = saved_eager; if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_DIRTY_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_CLEAN_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_CLEAN_info); } gct->failed_to_evac = true; @@ -1463,9 +1465,9 @@ scavenge_one(StgPtr p) } if (gct->failed_to_evac) { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info); } else { - ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info; + RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info); } break; } @@ -1583,6 +1585,10 @@ static void scavenge_mutable_list(bdescr *bd, generation *gen) { StgPtr p, q; +#if defined(DEBUG) + MutListScavStats stats; // Local accumulator + zeroMutListScavStats(&stats); +#endif uint32_t gen_no = gen->no; gct->evac_gen_no = gen_no; @@ -1598,31 +1604,31 @@ scavenge_mutable_list(bdescr *bd, generation *gen) case MUT_VAR_CLEAN: // can happen due to concurrent writeMutVars case MUT_VAR_DIRTY: - mutlist_MUTVARS++; break; + stats.n_MUTVAR++; break; case MUT_ARR_PTRS_CLEAN: case MUT_ARR_PTRS_DIRTY: case MUT_ARR_PTRS_FROZEN_CLEAN: case MUT_ARR_PTRS_FROZEN_DIRTY: - mutlist_MUTARRS++; break; + stats.n_MUTARR++; break; case MVAR_CLEAN: barf("MVAR_CLEAN on mutable list"); case MVAR_DIRTY: - mutlist_MVARS++; break; + stats.n_MVAR++; break; case TVAR: - mutlist_TVAR++; break; + stats.n_TVAR++; break; case TREC_CHUNK: - mutlist_TREC_CHUNK++; break; + stats.n_TREC_CHUNK++; break; case MUT_PRIM: pinfo = ((StgClosure*)p)->header.info; if (pinfo == &stg_TVAR_WATCH_QUEUE_info) - mutlist_TVAR_WATCH_QUEUE++; + stats.n_TVAR_WATCH_QUEUE++; else if (pinfo == &stg_TREC_HEADER_info) - mutlist_TREC_HEADER++; + stats.n_TREC_HEADER++; else - mutlist_OTHERS++; + stats.n_OTHERS++; break; default: - mutlist_OTHERS++; break; + stats.n_OTHERS++; break; } #endif @@ -1647,9 +1653,9 @@ scavenge_mutable_list(bdescr *bd, generation *gen) scavenge_mut_arr_ptrs_marked((StgMutArrPtrs *)p); if (gct->failed_to_evac) { - ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_DIRTY_info; + RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_DIRTY_info); } else { - ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info; + RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_CLEAN_info); } gct->eager_promotion = saved_eager_promotion; @@ -1671,6 +1677,13 @@ scavenge_mutable_list(bdescr *bd, generation *gen) } } } + +#if defined(DEBUG) + // For lack of a better option we protect mutlist_scav_stats with oldest_gen->sync + ACQUIRE_SPIN_LOCK(&oldest_gen->sync); + addMutListScavStats(&stats, &mutlist_scav_stats); + RELEASE_SPIN_LOCK(&oldest_gen->sync); +#endif } void @@ -1740,8 +1753,9 @@ scavenge_static(void) /* Take this object *off* the static_objects list, * and put it on the scavenged_static_objects list. */ - gct->static_objects = *STATIC_LINK(info,p); - *STATIC_LINK(info,p) = gct->scavenged_static_objects; + StgClosure **link = STATIC_LINK(info,p); + gct->static_objects = RELAXED_LOAD(link); + RELAXED_STORE(link, gct->scavenged_static_objects); gct->scavenged_static_objects = flagged_p; switch (info -> type) { diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c index c8c07db737..98aefa9a4b 100644 --- a/rts/sm/Storage.c +++ b/rts/sm/Storage.c @@ -445,7 +445,7 @@ lockCAF (StgRegTable *reg, StgIndStatic *caf) Capability *cap = regTableToCapability(reg); StgInd *bh; - orig_info = caf->header.info; + orig_info = RELAXED_LOAD(&caf->header.info); #if defined(THREADED_RTS) const StgInfoTable *cur_info; @@ -501,12 +501,11 @@ lockCAF (StgRegTable *reg, StgIndStatic *caf) } bh->indirectee = (StgClosure *)cap->r.rCurrentTSO; SET_HDR(bh, &stg_CAF_BLACKHOLE_info, caf->header.prof.ccs); - // Ensure that above writes are visible before we introduce reference as CAF indirectee. - write_barrier(); - caf->indirectee = (StgClosure *)bh; - write_barrier(); - SET_INFO((StgClosure*)caf,&stg_IND_STATIC_info); + // RELEASE ordering to ensure that above writes are visible before we + // introduce reference as CAF indirectee. + RELEASE_STORE(&caf->indirectee, (StgClosure *) bh); + SET_INFO_RELEASE((StgClosure*)caf, &stg_IND_STATIC_info); return bh; } @@ -1033,8 +1032,8 @@ allocateMightFail (Capability *cap, W_ n) g0->n_new_large_words += n; RELEASE_SM_LOCK; initBdescr(bd, g0, g0); - bd->flags = BF_LARGE; - bd->free = bd->start + n; + RELAXED_STORE(&bd->flags, BF_LARGE); + RELAXED_STORE(&bd->free, bd->start + n); cap->total_allocated += n; return bd->start; } @@ -1300,8 +1299,8 @@ dirty_MUT_VAR(StgRegTable *reg, StgMutVar *mvar, StgClosure *old) Capability *cap = regTableToCapability(reg); // No barrier required here as no other heap object fields are read. See // note [Heap memory barriers] in SMP.h. - if (mvar->header.info == &stg_MUT_VAR_CLEAN_info) { - mvar->header.info = &stg_MUT_VAR_DIRTY_info; + if (RELAXED_LOAD(&mvar->header.info) == &stg_MUT_VAR_CLEAN_info) { + SET_INFO((StgClosure*) mvar, &stg_MUT_VAR_DIRTY_info); recordClosureMutated(cap, (StgClosure *) mvar); IF_NONMOVING_WRITE_BARRIER_ENABLED { // See Note [Dirty flags in the non-moving collector] in NonMoving.c @@ -1323,8 +1322,8 @@ dirty_TVAR(Capability *cap, StgTVar *p, { // No barrier required here as no other heap object fields are read. See // note [Heap memory barriers] in SMP.h. - if (p->header.info == &stg_TVAR_CLEAN_info) { - p->header.info = &stg_TVAR_DIRTY_info; + if (RELAXED_LOAD(&p->header.info) == &stg_TVAR_CLEAN_info) { + SET_INFO((StgClosure*) p, &stg_TVAR_DIRTY_info); recordClosureMutated(cap,(StgClosure*)p); IF_NONMOVING_WRITE_BARRIER_ENABLED { // See Note [Dirty flags in the non-moving collector] in NonMoving.c @@ -1341,8 +1340,8 @@ dirty_TVAR(Capability *cap, StgTVar *p, void setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target) { - if (tso->dirty == 0) { - tso->dirty = 1; + if (RELAXED_LOAD(&tso->dirty) == 0) { + RELAXED_STORE(&tso->dirty, 1); recordClosureMutated(cap,(StgClosure*)tso); IF_NONMOVING_WRITE_BARRIER_ENABLED { updateRemembSetPushClosure(cap, (StgClosure *) tso->_link); @@ -1354,8 +1353,8 @@ setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target) void setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target) { - if (tso->dirty == 0) { - tso->dirty = 1; + if (RELAXED_LOAD(&tso->dirty) == 0) { + RELAXED_STORE(&tso->dirty, 1); recordClosureMutated(cap,(StgClosure*)tso); IF_NONMOVING_WRITE_BARRIER_ENABLED { updateRemembSetPushClosure(cap, (StgClosure *) tso->block_info.prev); @@ -1367,8 +1366,8 @@ setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target) void dirty_TSO (Capability *cap, StgTSO *tso) { - if (tso->dirty == 0) { - tso->dirty = 1; + if (RELAXED_LOAD(&tso->dirty) == 0) { + RELAXED_STORE(&tso->dirty, 1); recordClosureMutated(cap,(StgClosure*)tso); } @@ -1386,8 +1385,8 @@ dirty_STACK (Capability *cap, StgStack *stack) updateRemembSetPushStack(cap, stack); } - if (! (stack->dirty & STACK_DIRTY)) { - stack->dirty = STACK_DIRTY; + if (RELAXED_LOAD(&stack->dirty) == 0) { + RELAXED_STORE(&stack->dirty, 1); recordClosureMutated(cap,(StgClosure*)stack); } @@ -1562,10 +1561,13 @@ calcNeeded (bool force_major, memcount *blocks_needed) for (uint32_t g = 0; g < RtsFlags.GcFlags.generations; g++) { generation *gen = &generations[g]; - W_ blocks = gen->live_estimate ? (gen->live_estimate / BLOCK_SIZE_W) : gen->n_blocks; - blocks += gen->n_large_blocks - + gen->n_compact_blocks; + + // This can race with allocate() and compactAllocateBlockInternal() + // but only needs to be approximate + TSAN_ANNOTATE_BENIGN_RACE(&gen->n_large_blocks, "n_large_blocks"); + blocks += RELAXED_LOAD(&gen->n_large_blocks) + + RELAXED_LOAD(&gen->n_compact_blocks); // we need at least this much space needed += blocks; diff --git a/rts/sm/Storage.h b/rts/sm/Storage.h index 8d90c3ba5f..48ddcf35f5 100644 --- a/rts/sm/Storage.h +++ b/rts/sm/Storage.h @@ -72,8 +72,11 @@ bool getNewNursery (Capability *cap); INLINE_HEADER bool doYouWantToGC(Capability *cap) { + // This is necessarily approximate since otherwise we would need to take + // SM_LOCK to safely look at n_new_large_words. + TSAN_ANNOTATE_BENIGN_RACE(&g0->n_new_large_words, "doYouWantToGC(n_new_large_words)"); return ((cap->r.rCurrentNursery->link == NULL && !getNewNursery(cap)) || - g0->n_new_large_words >= large_alloc_lim); + RELAXED_LOAD(&g0->n_new_large_words) >= large_alloc_lim); } /* ----------------------------------------------------------------------------- @@ -91,7 +94,7 @@ INLINE_HEADER void finishedNurseryBlock (Capability *cap, bdescr *bd) { } INLINE_HEADER void newNurseryBlock (bdescr *bd) { - bd->free = bd->start; + RELAXED_STORE(&bd->free, bd->start); } void updateNurseriesStats (void); diff --git a/rts/win32/OSThreads.c b/rts/win32/OSThreads.c index f3bdefd998..ed8a598e51 100644 --- a/rts/win32/OSThreads.c +++ b/rts/win32/OSThreads.c @@ -444,6 +444,15 @@ interruptOSThread (OSThreadId id) CloseHandle(hdl); } +void +joinOSThread (OSThreadId id) +{ + int ret = WaitForSingleObject(id, INFINITE); + if (ret != WAIT_OBJECT_0) { + sysErrorBelch("joinOSThread: error %d", ret); + } +} + void setThreadNode (uint32_t node) { if (osNumaAvailable()) diff --git a/testsuite/driver/testglobals.py b/testsuite/driver/testglobals.py index 30b457e829..aa3bc4c4ff 100644 --- a/testsuite/driver/testglobals.py +++ b/testsuite/driver/testglobals.py @@ -151,6 +151,9 @@ class TestConfig: # Is the compiler dynamically linked? self.ghc_dynamic = False + # Are we running in a ThreadSanitizer-instrumented build? + self.have_thread_sanitizer = False + # Do symbols use leading underscores? self.leading_underscore = False diff --git a/testsuite/driver/testlib.py b/testsuite/driver/testlib.py index 15bcdde0eb..1a1c908fc3 100644 --- a/testsuite/driver/testlib.py +++ b/testsuite/driver/testlib.py @@ -578,6 +578,9 @@ def have_slow_bignum( ) -> bool: def llvm_build ( ) -> bool: return config.ghc_built_by_llvm +def have_thread_sanitizer( ) -> bool: + return config.have_thread_sanitizer + # --- # Note [Measuring residency] @@ -629,6 +632,10 @@ def collect_compiler_residency(tolerance_pct: float): def high_memory_usage(name, opts): opts.alone = True + # ThreadSanitizer significantly increases memory footprint; skip + if have_thread_sanitizer(): + opts.skip = True + # If a test is for a multi-CPU race, then running the test alone # increases the chance that we'll actually see it. def multi_cpu_race(name, opts): diff --git a/testsuite/tests/concurrent/should_run/all.T b/testsuite/tests/concurrent/should_run/all.T index 9297c5890e..2b8375e61e 100644 --- a/testsuite/tests/concurrent/should_run/all.T +++ b/testsuite/tests/concurrent/should_run/all.T @@ -245,6 +245,7 @@ test('conc068', [ omit_ways(concurrent_ways), exit_code(1) ], compile_and_run, [ test('setnumcapabilities001', [ only_ways(['threaded1','threaded2', 'nonmoving_thr']), extra_run_opts('8 12 2000'), + when(have_thread_sanitizer(), expect_broken(18808)), req_smp ], compile_and_run, ['']) diff --git a/testsuite/tests/dynlibs/all.T b/testsuite/tests/dynlibs/all.T index b7272d4bac..092c983389 100644 --- a/testsuite/tests/dynlibs/all.T +++ b/testsuite/tests/dynlibs/all.T @@ -1,12 +1,18 @@ - -test('T3807', [req_shared_libs, when(opsys('mingw32'), skip)], makefile_test, []) +test('T3807', + [req_shared_libs, + when(have_thread_sanitizer(), expect_broken(18883)), + when(opsys('mingw32'), + skip)], makefile_test, []) test('T4464', [req_shared_libs, unless(opsys('mingw32'), skip)], makefile_test, []) test('T5373', [req_shared_libs], makefile_test, []) # It's not clear exactly what platforms we can expect this to succeed on. -test('T13702', unless(opsys('linux'), skip), makefile_test, []) +test('T13702', + [when(have_thread_sanitizer(), expect_broken(18884)), + unless(opsys('linux'), skip)], + makefile_test, []) # test that -shared and -flink-rts actually links the rts test('T18072', [req_shared_libs, unless(opsys('linux'), skip)], makefile_test, []) diff --git a/testsuite/tests/hiefile/should_compile/all.T b/testsuite/tests/hiefile/should_compile/all.T index a98a042ca0..489cff28d0 100644 --- a/testsuite/tests/hiefile/should_compile/all.T +++ b/testsuite/tests/hiefile/should_compile/all.T @@ -4,6 +4,7 @@ test('hie002', when(wordsize(32), skip), # No linting in perf tests: no_lint, + high_memory_usage, collect_compiler_stats('bytes allocated',10)], compile, ['-fno-code -fwrite-ide-info']) diff --git a/testsuite/tests/perf/compiler/all.T b/testsuite/tests/perf/compiler/all.T index 090dbb4acf..0b9cd2fec3 100644 --- a/testsuite/tests/perf/compiler/all.T +++ b/testsuite/tests/perf/compiler/all.T @@ -132,20 +132,23 @@ test('T9675', test('T9872a', [ only_ways(['normal']), - collect_compiler_stats('bytes allocated', 1) + collect_compiler_stats('bytes allocated', 1), + high_memory_usage ], compile_fail, ['']) test('T9872b', [ only_ways(['normal']), - collect_compiler_stats('bytes allocated', 1) + collect_compiler_stats('bytes allocated', 1), + high_memory_usage ], compile_fail, ['']) test('T9872c', [ only_ways(['normal']), - collect_compiler_stats('bytes allocated', 1) + collect_compiler_stats('bytes allocated', 1), + high_memory_usage ], compile_fail, ['']) diff --git a/testsuite/tests/rts/all.T b/testsuite/tests/rts/all.T index e469783e80..62c893cde1 100644 --- a/testsuite/tests/rts/all.T +++ b/testsuite/tests/rts/all.T @@ -38,6 +38,8 @@ test('derefnull', when(platform('i386-apple-darwin'), [ignore_stderr, exit_code(139)]), when(platform('x86_64-apple-darwin'), [ignore_stderr, exit_code(139)]), when(opsys('mingw32'), [ignore_stderr, exit_code(11)]), + # ThreadSanitizer changes the output + when(have_thread_sanitizer(), skip), # since these test are supposed to crash the # profile report will be empty always. # so disable the check for profiling @@ -64,6 +66,8 @@ test('divbyzero', # The output under OS X is too unstable to readily compare when(platform('i386-apple-darwin'), [ignore_stderr, exit_code(136)]), when(platform('x86_64-apple-darwin'), [ignore_stderr, exit_code(136)]), + # ThreadSanitizer changes the output + when(have_thread_sanitizer(), skip), # since these test are supposed to crash the # profile report will be empty always. # so disable the check for profiling |