summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorBen Gamari <ben@smart-cactus.org>2020-11-08 09:29:16 -0500
committerBen Gamari <ben@smart-cactus.org>2020-11-08 09:29:16 -0500
commit638f38c50e80a19275f3a06535a0dd8130a17a53 (patch)
treeac18855cd2f39544e4841866fbabb3f86a4d1f35 /rts
parentb1d2c1f3246b3740589a59bdf7648c13de47c32b (diff)
parent07e82ba52228580cfbd90ff031e657acbecc715b (diff)
downloadhaskell-638f38c50e80a19275f3a06535a0dd8130a17a53.tar.gz
Merge remote-tracking branch 'origin/wip/tsan/all'
Diffstat (limited to 'rts')
-rw-r--r--rts/.tsan-suppressions14
-rw-r--r--rts/Capability.c254
-rw-r--r--rts/Capability.h32
-rw-r--r--rts/Messages.c41
-rw-r--r--rts/Proftimer.c10
-rw-r--r--rts/RaiseAsync.c9
-rw-r--r--rts/RtsStartup.c12
-rw-r--r--rts/SMPClosureOps.h9
-rw-r--r--rts/STM.c92
-rw-r--r--rts/Schedule.c102
-rw-r--r--rts/Schedule.h15
-rw-r--r--rts/Sparks.c4
-rw-r--r--rts/SpinLock.c41
-rw-r--r--rts/StablePtr.c9
-rw-r--r--rts/Stats.c83
-rw-r--r--rts/Stats.h4
-rw-r--r--rts/Task.c7
-rw-r--r--rts/Task.h8
-rw-r--r--rts/ThreadPaused.c11
-rw-r--r--rts/Threads.c33
-rw-r--r--rts/Timer.c32
-rw-r--r--rts/Updates.h6
-rw-r--r--rts/WSDeque.c215
-rw-r--r--rts/WSDeque.h51
-rw-r--r--rts/Weak.c29
-rw-r--r--rts/posix/GetTime.c6
-rw-r--r--rts/posix/OSThreads.c14
-rw-r--r--rts/posix/Signals.c48
-rw-r--r--rts/posix/itimer/Pthread.c19
-rw-r--r--rts/rts.cabal.in13
-rw-r--r--rts/sm/BlockAlloc.c38
-rw-r--r--rts/sm/CNF.c1
-rw-r--r--rts/sm/Evac.c139
-rw-r--r--rts/sm/GC.c135
-rw-r--r--rts/sm/GC.h28
-rw-r--r--rts/sm/GCAux.c2
-rw-r--r--rts/sm/GCUtils.c13
-rw-r--r--rts/sm/GCUtils.h4
-rw-r--r--rts/sm/MarkWeak.c5
-rw-r--r--rts/sm/NonMoving.c1
-rw-r--r--rts/sm/Scav.c142
-rw-r--r--rts/sm/Storage.c50
-rw-r--r--rts/sm/Storage.h7
-rw-r--r--rts/win32/OSThreads.c9
44 files changed, 1054 insertions, 743 deletions
diff --git a/rts/.tsan-suppressions b/rts/.tsan-suppressions
new file mode 100644
index 0000000000..734faff5a6
--- /dev/null
+++ b/rts/.tsan-suppressions
@@ -0,0 +1,14 @@
+# ThreadSanitizer suppressions.
+# See Note [ThreadSanitizer] in includes/rts/TSANUtils.h.
+
+# This is a known race but is benign
+race:capability_is_busy
+
+# This is a benign race during IO manager shutdown (between ioManagerWakeup
+# and GHC.Event.Control.closeControl).
+race:ioManagerWakeup
+race:base_GHCziEventziControl_zdwcloseControl_info
+
+# This is a potentially problematic race which I have yet to work out
+# (#17289)
+race:handle_tick
diff --git a/rts/Capability.c b/rts/Capability.c
index 8dddce7028..a655fc7b3f 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -84,8 +84,8 @@ Capability * rts_unsafeGetMyCapability (void)
STATIC_INLINE bool
globalWorkToDo (void)
{
- return sched_state >= SCHED_INTERRUPTING
- || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
+ return RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING
+ || RELAXED_LOAD(&recent_activity) == ACTIVITY_INACTIVE; // need to check for deadlock
}
#endif
@@ -211,7 +211,10 @@ newReturningTask (Capability *cap, Task *task)
cap->returning_tasks_hd = task;
}
cap->returning_tasks_tl = task;
- cap->n_returning_tasks++;
+
+ // See Note [Data race in shouldYieldCapability] in Schedule.c.
+ RELAXED_ADD(&cap->n_returning_tasks, 1);
+
ASSERT_RETURNING_TASKS(cap,task);
}
@@ -227,7 +230,10 @@ popReturningTask (Capability *cap)
cap->returning_tasks_tl = NULL;
}
task->next = NULL;
- cap->n_returning_tasks--;
+
+ // See Note [Data race in shouldYieldCapability] in Schedule.c.
+ RELAXED_ADD(&cap->n_returning_tasks, -1);
+
ASSERT_RETURNING_TASKS(cap,task);
return task;
}
@@ -306,6 +312,7 @@ initCapability (Capability *cap, uint32_t i)
cap->free_trec_headers = NO_TREC;
cap->transaction_tokens = 0;
cap->context_switch = 0;
+ cap->interrupt = 0;
cap->pinned_object_block = NULL;
cap->pinned_object_blocks = NULL;
@@ -409,36 +416,44 @@ void
moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
{
#if defined(THREADED_RTS)
- uint32_t i;
- Capability **old_capabilities = capabilities;
+ Capability **new_capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
- capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
+ // We must disable the timer while we do this since the tick handler may
+ // call contextSwitchAllCapabilities, which may see the capabilities array
+ // as we free it. The alternative would be to protect the capabilities
+ // array with a lock but this seems more expensive than necessary.
+ // See #17289.
+ stopTimer();
if (to == 1) {
// THREADED_RTS must work on builds that don't have a mutable
// BaseReg (eg. unregisterised), so in this case
// capabilities[0] must coincide with &MainCapability.
- capabilities[0] = &MainCapability;
+ new_capabilities[0] = &MainCapability;
initCapability(&MainCapability, 0);
}
else
{
- for (i = 0; i < to; i++) {
+ for (uint32_t i = 0; i < to; i++) {
if (i < from) {
- capabilities[i] = old_capabilities[i];
+ new_capabilities[i] = capabilities[i];
} else {
- capabilities[i] = stgMallocBytes(sizeof(Capability),
- "moreCapabilities");
- initCapability(capabilities[i], i);
+ new_capabilities[i] = stgMallocBytes(sizeof(Capability),
+ "moreCapabilities");
+ initCapability(new_capabilities[i], i);
}
}
}
debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
+ Capability **old_capabilities = ACQUIRE_LOAD(&capabilities);
+ RELEASE_STORE(&capabilities, new_capabilities);
if (old_capabilities != NULL) {
stgFree(old_capabilities);
}
+
+ startTimer();
#endif
}
@@ -504,6 +519,9 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
* The current Task (cap->task) releases the Capability. The Capability is
* marked free, and if there is any work to do, an appropriate Task is woken up.
*
+ * The caller must hold cap->lock and will still hold it after
+ * releaseCapability returns.
+ *
* N.B. May need to take all_tasks_mutex.
*
* ------------------------------------------------------------------------- */
@@ -519,8 +537,9 @@ releaseCapability_ (Capability* cap,
ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
ASSERT_RETURNING_TASKS(cap,task);
+ ASSERT_LOCK_HELD(&cap->lock);
- cap->running_task = NULL;
+ RELAXED_STORE(&cap->running_task, NULL);
// Check to see whether a worker thread can be given
// the go-ahead to return the result of an external call..
@@ -539,7 +558,7 @@ releaseCapability_ (Capability* cap,
// be currently in waitForCapability() waiting for this
// capability, in which case simply setting it as free would not
// wake up the waiting task.
- PendingSync *sync = pending_sync;
+ PendingSync *sync = SEQ_CST_LOAD(&pending_sync);
if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) {
debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no);
return;
@@ -563,7 +582,7 @@ releaseCapability_ (Capability* cap,
// is interrupted, we only create a worker task if there
// are threads that need to be completed. If the system is
// shutting down, we never create a new worker.
- if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
+ if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
debugTrace(DEBUG_sched,
"starting new worker on capability %d", cap->no);
startWorkerTask(cap);
@@ -586,7 +605,7 @@ releaseCapability_ (Capability* cap,
#if defined(PROFILING)
cap->r.rCCCS = CCS_IDLE;
#endif
- last_free_capability[cap->node] = cap;
+ RELAXED_STORE(&last_free_capability[cap->node], cap);
debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
}
@@ -638,6 +657,36 @@ enqueueWorker (Capability* cap USED_IF_THREADS)
#endif
+/*
+ * Note [Benign data race due to work-pushing]
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ *
+ * #17276 points out a tricky data race (noticed by ThreadSanitizer) between
+ * waitForWorkerCapability and schedulePushWork. In short, schedulePushWork
+ * works as follows:
+ *
+ * 1. collect the set of all idle capabilities, take cap->lock of each.
+ *
+ * 2. sort through each TSO on the calling capability's run queue and push
+ * some to idle capabilities. This may (if the TSO is a bound thread)
+ * involve setting tso->bound->task->cap despite not holding
+ * tso->bound->task->lock.
+ *
+ * 3. release cap->lock of all idle capabilities.
+ *
+ * Now, step 2 is in principle safe since the capability of the caller of
+ * schedulePushWork *owns* the TSO and therefore the Task to which it is bound.
+ * Furthermore, step 3 ensures that the write in step (2) will be visible to
+ * any core which starts execution of the previously-idle capability.
+ *
+ * However, this argument doesn't quite work for waitForWorkerCapability, which
+ * reads task->cap *without* first owning the capability which owns `task`.
+ * For this reason, we check again whether the task has been migrated to
+ * another capability after taking task->cap->lock. See Note [migrated bound
+ * threads] above.
+ *
+ */
+
/* ----------------------------------------------------------------------------
* waitForWorkerCapability(task)
*
@@ -655,7 +704,13 @@ static Capability * waitForWorkerCapability (Task *task)
ACQUIRE_LOCK(&task->lock);
// task->lock held, cap->lock not held
if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ // The happens-after matches the happens-before in
+ // schedulePushWork, which does owns 'task' when it sets 'task->cap'.
+ TSAN_ANNOTATE_HAPPENS_AFTER(&task->cap);
cap = task->cap;
+
+ // See Note [Benign data race due to work-pushing].
+ TSAN_ANNOTATE_BENIGN_RACE(&task->cap, "we will double-check this below");
task->wakeup = false;
RELEASE_LOCK(&task->lock);
@@ -691,7 +746,7 @@ static Capability * waitForWorkerCapability (Task *task)
cap->n_spare_workers--;
}
- cap->running_task = task;
+ RELAXED_STORE(&cap->running_task, task);
RELEASE_LOCK(&cap->lock);
break;
}
@@ -732,7 +787,7 @@ static Capability * waitForReturnCapability (Task *task)
RELEASE_LOCK(&cap->lock);
continue;
}
- cap->running_task = task;
+ RELAXED_STORE(&cap->running_task, task);
popReturningTask(cap);
RELEASE_LOCK(&cap->lock);
break;
@@ -745,6 +800,65 @@ static Capability * waitForReturnCapability (Task *task)
#endif /* THREADED_RTS */
+#if defined(THREADED_RTS)
+
+/* ----------------------------------------------------------------------------
+ * capability_is_busy (Capability *cap)
+ *
+ * A predicate for determining whether the given Capability is currently running
+ * a Task. This can be safely called without holding the Capability's lock
+ * although the result may be inaccurate if it races with the scheduler.
+ * Consequently there is a TSAN suppression for it.
+ *
+ * ------------------------------------------------------------------------- */
+static bool capability_is_busy(const Capability * cap)
+{
+ return RELAXED_LOAD(&cap->running_task) != NULL;
+}
+
+
+/* ----------------------------------------------------------------------------
+ * find_capability_for_task
+ *
+ * Given a Task, identify a reasonable Capability to run it on. We try to
+ * find an idle capability if possible.
+ *
+ * ------------------------------------------------------------------------- */
+
+static Capability * find_capability_for_task(const Task * task)
+{
+ if (task->preferred_capability != -1) {
+ // Does the task have a preferred capability? If so, use it
+ return capabilities[task->preferred_capability %
+ enabled_capabilities];
+ } else {
+ // Try last_free_capability first
+ Capability *cap = RELAXED_LOAD(&last_free_capability[task->node]);
+
+ // N.B. There is a data race here since we are loking at
+ // cap->running_task without taking cap->lock. However, this is
+ // benign since the result is merely guiding our search heuristic.
+ if (!capability_is_busy(cap)) {
+ return cap;
+ } else {
+ // The last_free_capability is already busy, search for a free
+ // capability on this node.
+ for (uint32_t i = task->node; i < enabled_capabilities;
+ i += n_numa_nodes) {
+ // visits all the capabilities on this node, because
+ // cap[i]->node == i % n_numa_nodes
+ if (!RELAXED_LOAD(&capabilities[i]->running_task)) {
+ return capabilities[i];
+ }
+ }
+
+ // Can't find a free one, use last_free_capability.
+ return RELAXED_LOAD(&last_free_capability[task->node]);
+ }
+ }
+}
+#endif /* THREADED_RTS */
+
/* ----------------------------------------------------------------------------
* waitForCapability (Capability **pCap, Task *task)
*
@@ -767,38 +881,13 @@ void waitForCapability (Capability **pCap, Task *task)
*pCap = &MainCapability;
#else
- uint32_t i;
Capability *cap = *pCap;
if (cap == NULL) {
- if (task->preferred_capability != -1) {
- cap = capabilities[task->preferred_capability %
- enabled_capabilities];
- } else {
- // Try last_free_capability first
- cap = last_free_capability[task->node];
- if (cap->running_task) {
- // Otherwise, search for a free capability on this node.
- cap = NULL;
- for (i = task->node; i < enabled_capabilities;
- i += n_numa_nodes) {
- // visits all the capabilities on this node, because
- // cap[i]->node == i % n_numa_nodes
- if (!capabilities[i]->running_task) {
- cap = capabilities[i];
- break;
- }
- }
- if (cap == NULL) {
- // Can't find a free one, use last_free_capability.
- cap = last_free_capability[task->node];
- }
- }
- }
+ cap = find_capability_for_task(task);
// record the Capability as the one this Task is now associated with.
task->cap = cap;
-
} else {
ASSERT(task->cap == cap);
}
@@ -808,7 +897,7 @@ void waitForCapability (Capability **pCap, Task *task)
ACQUIRE_LOCK(&cap->lock);
if (!cap->running_task) {
// It's free; just grab it
- cap->running_task = task;
+ RELAXED_STORE(&cap->running_task, task);
RELEASE_LOCK(&cap->lock);
} else {
newReturningTask(cap,task);
@@ -872,7 +961,7 @@ yieldCapability
if (gcAllowed)
{
- PendingSync *sync = pending_sync;
+ PendingSync *sync = SEQ_CST_LOAD(&pending_sync);
if (sync) {
switch (sync->type) {
@@ -943,33 +1032,41 @@ yieldCapability
#endif /* THREADED_RTS */
-// Note [migrated bound threads]
-//
-// There's a tricky case where:
-// - cap A is running an unbound thread T1
-// - there is a bound thread T2 at the head of the run queue on cap A
-// - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
-// - T1 returns quickly grabbing A again (T2 is still waking up on A)
-// - T1 blocks, the scheduler migrates T2 to cap B
-// - the task bound to T2 wakes up on cap B
-//
-// We take advantage of the following invariant:
-//
-// - A bound thread can only be migrated by the holder of the
-// Capability on which the bound thread currently lives. So, if we
-// hold Capability C, and task->cap == C, then task cannot be
-// migrated under our feet.
-
-// Note [migrated bound threads 2]
-//
-// Second tricky case;
-// - A bound Task becomes a GC thread
-// - scheduleDoGC() migrates the thread belonging to this Task,
-// because the Capability it is on is disabled
-// - after GC, gcWorkerThread() returns, but now we are
-// holding a Capability that is not the same as task->cap
-// - Hence we must check for this case and immediately give up the
-// cap we hold.
+/*
+ * Note [migrated bound threads]
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ *
+ * There's a tricky case where:
+ * - cap A is running an unbound thread T1
+ * - there is a bound thread T2 at the head of the run queue on cap A
+ * - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
+ * - T1 returns quickly grabbing A again (T2 is still waking up on A)
+ * - T1 blocks, the scheduler migrates T2 to cap B
+ * - the task bound to T2 wakes up on cap B
+ *
+ * We take advantage of the following invariant:
+ *
+ * - A bound thread can only be migrated by the holder of the
+ * Capability on which the bound thread currently lives. So, if we
+ * hold Capability C, and task->cap == C, then task cannot be
+ * migrated under our feet.
+ *
+ * See also Note [Benign data race due to work-pushing].
+ *
+ *
+ * Note [migrated bound threads 2]
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ *
+ * Second tricky case;
+ * - A bound Task becomes a GC thread
+ * - scheduleDoGC() migrates the thread belonging to this Task,
+ * because the Capability it is on is disabled
+ * - after GC, gcWorkerThread() returns, but now we are
+ * holding a Capability that is not the same as task->cap
+ * - Hence we must check for this case and immediately give up the
+ * cap we hold.
+ *
+ */
/* ----------------------------------------------------------------------------
* prodCapability
@@ -1006,7 +1103,10 @@ bool
tryGrabCapability (Capability *cap, Task *task)
{
int r;
- if (cap->running_task != NULL) return false;
+ // N.B. This is benign as we will check again after taking the lock.
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->running_task, "tryGrabCapability (cap->running_task)");
+ if (RELAXED_LOAD(&cap->running_task) != NULL) return false;
+
r = TRY_ACQUIRE_LOCK(&cap->lock);
if (r != 0) return false;
if (cap->running_task != NULL) {
@@ -1014,7 +1114,7 @@ tryGrabCapability (Capability *cap, Task *task)
return false;
}
task->cap = cap;
- cap->running_task = task;
+ RELAXED_STORE(&cap->running_task, task);
RELEASE_LOCK(&cap->lock);
return true;
}
@@ -1263,7 +1363,7 @@ void
setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
#if defined(THREADED_RTS)
if (cap_no < n_capabilities) {
- capabilities[cap_no]->io_manager_control_wr_fd = fd;
+ RELAXED_STORE(&capabilities[cap_no]->io_manager_control_wr_fd, fd);
} else {
errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
}
diff --git a/rts/Capability.h b/rts/Capability.h
index 82046c0b9e..8c5b1e814e 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -100,6 +100,8 @@ struct Capability_ {
// Context switch flag. When non-zero, this means: stop running
// Haskell code, and switch threads.
+ //
+ // Does not require lock to read or write.
int context_switch;
// Interrupt flag. Like the context_switch flag, this also
@@ -110,6 +112,8 @@ struct Capability_ {
// The interrupt flag is always reset before we start running
// Haskell code, unlike the context_switch flag which is only
// reset after we have executed the context switch.
+ //
+ // Does not require lock to read or write.
int interrupt;
// Total words allocated by this cap since rts start
@@ -178,10 +182,10 @@ struct Capability_ {
#endif
// These properties should be true when a Task is holding a Capability
-#define ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task) \
- ASSERT(cap->running_task != NULL && cap->running_task == task); \
- ASSERT(task->cap == cap); \
- ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task)
+#define ASSERT_FULL_CAPABILITY_INVARIANTS(_cap,_task) \
+ ASSERT(_cap->running_task != NULL && _cap->running_task == _task); \
+ ASSERT(_task->cap == _cap); \
+ ASSERT_PARTIAL_CAPABILITY_INVARIANTS(_cap,_task)
// This assert requires cap->lock to be held, so it can't be part of
// ASSERT_PARTIAL_CAPABILITY_INVARIANTS()
@@ -415,14 +419,16 @@ recordMutableCap (const StgClosure *p, Capability *cap, uint32_t gen)
// ASSERT(cap->running_task == myTask());
// NO: assertion is violated by performPendingThrowTos()
bd = cap->mut_lists[gen];
- if (bd->free >= bd->start + BLOCK_SIZE_W) {
+ if (RELAXED_LOAD(&bd->free) >= bd->start + BLOCK_SIZE_W) {
bdescr *new_bd;
new_bd = allocBlockOnNode_lock(cap->node);
new_bd->link = bd;
+ new_bd->free = new_bd->start;
bd = new_bd;
cap->mut_lists[gen] = bd;
}
- *bd->free++ = (StgWord)p;
+ RELAXED_STORE(bd->free, (StgWord) p);
+ NONATOMIC_ADD(&bd->free, 1);
}
EXTERN_INLINE void
@@ -456,29 +462,33 @@ stopCapability (Capability *cap)
// It may not work - the thread might be updating HpLim itself
// at the same time - so we also have the context_switch/interrupted
// flags as a sticky way to tell the thread to stop.
- cap->r.rHpLim = NULL;
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->r.rHpLim, "stopCapability");
+ SEQ_CST_STORE(&cap->r.rHpLim, NULL);
}
INLINE_HEADER void
interruptCapability (Capability *cap)
{
stopCapability(cap);
- cap->interrupt = 1;
+ SEQ_CST_STORE(&cap->interrupt, true);
}
INLINE_HEADER void
contextSwitchCapability (Capability *cap)
{
stopCapability(cap);
- cap->context_switch = 1;
+ SEQ_CST_STORE(&cap->context_switch, true);
}
#if defined(THREADED_RTS)
INLINE_HEADER bool emptyInbox(Capability *cap)
{
- return (cap->inbox == (Message*)END_TSO_QUEUE &&
- cap->putMVars == NULL);
+ // This may race with writes to putMVars and inbox but this harmless for the
+ // intended uses of this function.
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->putMVars, "emptyInbox(cap->putMVars)");
+ return (RELAXED_LOAD(&cap->inbox) == (Message*)END_TSO_QUEUE &&
+ RELAXED_LOAD(&cap->putMVars) == NULL);
}
#endif
diff --git a/rts/Messages.c b/rts/Messages.c
index 2f80370845..285ca5be63 100644
--- a/rts/Messages.c
+++ b/rts/Messages.c
@@ -39,7 +39,7 @@ void sendMessage(Capability *from_cap, Capability *to_cap, Message *msg)
#endif
msg->link = to_cap->inbox;
- to_cap->inbox = msg;
+ RELAXED_STORE(&to_cap->inbox, msg);
recordClosureMutated(from_cap,(StgClosure*)msg);
@@ -68,8 +68,7 @@ executeMessage (Capability *cap, Message *m)
const StgInfoTable *i;
loop:
- write_barrier(); // allow m->header to be modified by another thread
- i = m->header.info;
+ i = ACQUIRE_LOAD(&m->header.info);
if (i == &stg_MSG_TRY_WAKEUP_info)
{
StgTSO *tso = ((MessageWakeup *)m)->tso;
@@ -127,7 +126,7 @@ loop:
else if (i == &stg_WHITEHOLE_info)
{
#if defined(PROF_SPIN)
- ++whitehole_executeMessage_spin;
+ NONATOMIC_ADD(&whitehole_executeMessage_spin, 1);
#endif
goto loop;
}
@@ -169,8 +168,7 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
debugTraceCap(DEBUG_sched, cap, "message: thread %d blocking on "
"blackhole %p", (W_)msg->tso->id, msg->bh);
- info = bh->header.info;
- load_load_barrier(); // See Note [Heap memory barriers] in SMP.h
+ info = ACQUIRE_LOAD(&bh->header.info);
// If we got this message in our inbox, it might be that the
// BLACKHOLE has already been updated, and GC has shorted out the
@@ -190,11 +188,11 @@ uint32_t messageBlackHole(Capability *cap, MessageBlackHole *msg)
// The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
// or a value.
loop:
- // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
- // and turns this into an infinite loop.
- p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
- info = p->header.info;
- load_load_barrier(); // See Note [Heap memory barriers] in SMP.h
+ // If we are being called from stg_BLACKHOLE then TSAN won't know about the
+ // previous read barrier that makes the following access safe.
+ TSAN_ANNOTATE_BENIGN_RACE(&((StgInd*)bh)->indirectee, "messageBlackHole");
+ p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee));
+ info = RELAXED_LOAD(&p->header.info);
if (info == &stg_IND_info)
{
@@ -240,9 +238,8 @@ loop:
// We are about to make the newly-constructed message visible to other cores;
// a barrier is necessary to ensure that all writes are visible.
// See Note [Heap memory barriers] in SMP.h.
- write_barrier();
dirty_TSO(cap, owner); // we will modify owner->bq
- owner->bq = bq;
+ RELEASE_STORE(&owner->bq, bq);
// If the owner of the blackhole is currently runnable, then
// bump it to the front of the run queue. This gives the
@@ -258,11 +255,11 @@ loop:
}
// point to the BLOCKING_QUEUE from the BLACKHOLE
- write_barrier(); // make the BQ visible, see Note [Heap memory barriers].
+ // RELEASE to make the BQ visible, see Note [Heap memory barriers].
+ RELEASE_STORE(&((StgInd*)bh)->indirectee, (StgClosure *)bq);
IF_NONMOVING_WRITE_BARRIER_ENABLED {
updateRemembSetPushClosure(cap, (StgClosure*)p);
}
- ((StgInd*)bh)->indirectee = (StgClosure *)bq;
recordClosureMutated(cap,bh); // bh was mutated
debugTraceCap(DEBUG_sched, cap, "thread %d blocked on thread %d",
@@ -295,14 +292,14 @@ loop:
// makes it into the update remembered set
updateRemembSetPushClosure(cap, (StgClosure*)bq->queue);
}
- msg->link = bq->queue;
+ RELAXED_STORE(&msg->link, bq->queue);
bq->queue = msg;
// No barrier is necessary here: we are only exposing the
// closure to the GC. See Note [Heap memory barriers] in SMP.h.
recordClosureMutated(cap,(StgClosure*)msg);
if (info == &stg_BLOCKING_QUEUE_CLEAN_info) {
- bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ RELAXED_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info);
// No barrier is necessary here: we are only exposing the
// closure to the GC. See Note [Heap memory barriers] in SMP.h.
recordClosureMutated(cap,(StgClosure*)bq);
@@ -333,7 +330,7 @@ StgTSO * blackHoleOwner (StgClosure *bh)
const StgInfoTable *info;
StgClosure *p;
- info = bh->header.info;
+ info = RELAXED_LOAD(&bh->header.info);
if (info != &stg_BLACKHOLE_info &&
info != &stg_CAF_BLACKHOLE_info &&
@@ -345,10 +342,8 @@ StgTSO * blackHoleOwner (StgClosure *bh)
// The blackhole must indirect to a TSO, a BLOCKING_QUEUE, an IND,
// or a value.
loop:
- // NB. VOLATILE_LOAD(), because otherwise gcc hoists the load
- // and turns this into an infinite loop.
- p = UNTAG_CLOSURE((StgClosure*)VOLATILE_LOAD(&((StgInd*)bh)->indirectee));
- info = p->header.info;
+ p = UNTAG_CLOSURE(ACQUIRE_LOAD(&((StgInd*)bh)->indirectee));
+ info = RELAXED_LOAD(&p->header.info);
if (info == &stg_IND_info) goto loop;
@@ -360,7 +355,7 @@ loop:
info == &stg_BLOCKING_QUEUE_DIRTY_info)
{
StgBlockingQueue *bq = (StgBlockingQueue *)p;
- return bq->owner;
+ return RELAXED_LOAD(&bq->owner);
}
return NULL; // not blocked
diff --git a/rts/Proftimer.c b/rts/Proftimer.c
index 68a73a5446..24f82ead6d 100644
--- a/rts/Proftimer.c
+++ b/rts/Proftimer.c
@@ -30,7 +30,7 @@ void
stopProfTimer( void )
{
#if defined(PROFILING)
- do_prof_ticks = false;
+ RELAXED_STORE(&do_prof_ticks, false);
#endif
}
@@ -38,14 +38,14 @@ void
startProfTimer( void )
{
#if defined(PROFILING)
- do_prof_ticks = true;
+ RELAXED_STORE(&do_prof_ticks, true);
#endif
}
void
stopHeapProfTimer( void )
{
- do_heap_prof_ticks = false;
+ RELAXED_STORE(&do_heap_prof_ticks, false);
}
void
@@ -74,7 +74,7 @@ handleProfTick(void)
{
#if defined(PROFILING)
total_ticks++;
- if (do_prof_ticks) {
+ if (RELAXED_LOAD(&do_prof_ticks)) {
uint32_t n;
for (n=0; n < n_capabilities; n++) {
capabilities[n]->r.rCCCS->time_ticks++;
@@ -83,7 +83,7 @@ handleProfTick(void)
}
#endif
- if (do_heap_prof_ticks) {
+ if (RELAXED_LOAD(&do_heap_prof_ticks)) {
ticks_to_heap_profile--;
if (ticks_to_heap_profile <= 0) {
ticks_to_heap_profile = RtsFlags.ProfFlags.heapProfileIntervalTicks;
diff --git a/rts/RaiseAsync.c b/rts/RaiseAsync.c
index 719c05435d..a3593fe7a6 100644
--- a/rts/RaiseAsync.c
+++ b/rts/RaiseAsync.c
@@ -232,7 +232,7 @@ uint32_t
throwToMsg (Capability *cap, MessageThrowTo *msg)
{
StgWord status;
- StgTSO *target = msg->target;
+ StgTSO *target = ACQUIRE_LOAD(&msg->target);
Capability *target_cap;
goto check_target;
@@ -245,8 +245,9 @@ check_target:
ASSERT(target != END_TSO_QUEUE);
// Thread already dead?
- if (target->what_next == ThreadComplete
- || target->what_next == ThreadKilled) {
+ StgWord16 what_next = SEQ_CST_LOAD(&target->what_next);
+ if (what_next == ThreadComplete
+ || what_next == ThreadKilled) {
return THROWTO_SUCCESS;
}
@@ -988,7 +989,7 @@ raiseAsync(Capability *cap, StgTSO *tso, StgClosure *exception,
sp[0] = (W_)raise;
sp[-1] = (W_)&stg_enter_info;
stack->sp = sp-1;
- tso->what_next = ThreadRunGHC;
+ RELAXED_STORE(&tso->what_next, ThreadRunGHC);
goto done;
}
diff --git a/rts/RtsStartup.c b/rts/RtsStartup.c
index a3dddb03f5..5e2495844c 100644
--- a/rts/RtsStartup.c
+++ b/rts/RtsStartup.c
@@ -285,6 +285,13 @@ hs_init_ghc(int *argc, char **argv[], RtsConfig rts_config)
/* Initialise libdw session pool */
libdwPoolInit();
+ /* Start the "ticker" and profiling timer but don't start until the
+ * scheduler is up. However, the ticker itself needs to be initialized
+ * before the scheduler to ensure that the ticker mutex is initialized as
+ * moreCapabilities will attempt to acquire it.
+ */
+ initTimer();
+
/* initialise scheduler data structures (needs to be done before
* initStorage()).
*/
@@ -366,7 +373,6 @@ hs_init_ghc(int *argc, char **argv[], RtsConfig rts_config)
initHeapProfiling();
/* start the virtual timer 'subsystem'. */
- initTimer();
startTimer();
#if defined(RTS_USER_SIGNALS)
@@ -575,6 +581,10 @@ hs_exit_(bool wait_foreign)
if (is_io_mng_native_p())
hs_restoreConsoleCP();
#endif
+
+ /* tear down statistics subsystem */
+ stat_exit();
+
/* free hash table storage */
exitHashTable();
diff --git a/rts/SMPClosureOps.h b/rts/SMPClosureOps.h
index c73821a782..2df88db06c 100644
--- a/rts/SMPClosureOps.h
+++ b/rts/SMPClosureOps.h
@@ -62,12 +62,12 @@ EXTERN_INLINE StgInfoTable *reallyLockClosure(StgClosure *p)
info = xchg((P_)(void *)&p->header.info, (W_)&stg_WHITEHOLE_info);
if (info != (W_)&stg_WHITEHOLE_info) return (StgInfoTable *)info;
#if defined(PROF_SPIN)
- ++whitehole_lockClosure_spin;
+ NONATOMIC_ADD(&whitehole_lockClosure_spin, 1);
#endif
busy_wait_nop();
} while (++i < SPIN_COUNT);
#if defined(PROF_SPIN)
- ++whitehole_lockClosure_yield;
+ NONATOMIC_ADD(&whitehole_lockClosure_yield, 1);
#endif
yieldThread();
} while (1);
@@ -119,9 +119,8 @@ tryLockClosure(StgClosure *p)
EXTERN_INLINE void unlockClosure(StgClosure *p, const StgInfoTable *info)
{
- // This is a strictly ordered write, so we need a write_barrier():
- write_barrier();
- p->header.info = info;
+ // This is a strictly ordered write, so we need a RELEASE ordering.
+ RELEASE_STORE(&p->header.info, info);
}
#endif /* CMINUSMINUS */
diff --git a/rts/STM.c b/rts/STM.c
index cff0d55082..16dd029aea 100644
--- a/rts/STM.c
+++ b/rts/STM.c
@@ -187,7 +187,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
StgTVar *s STG_UNUSED) {
StgClosure *result;
TRACE("%p : lock_tvar(%p)", trec, s);
- result = s -> current_value;
+ result = SEQ_CST_LOAD(&s->current_value);
return result;
}
@@ -198,8 +198,8 @@ static void unlock_tvar(Capability *cap,
StgBool force_update) {
TRACE("%p : unlock_tvar(%p)", trec, s);
if (force_update) {
- StgClosure *old_value = s -> current_value;
- s -> current_value = c;
+ StgClosure *old_value = SEQ_CST_LOAD(&s->current_value);
+ RELEASE_STORE(&s->current_value, c);
dirty_TVAR(cap, s, old_value);
}
}
@@ -210,7 +210,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED,
StgClosure *expected) {
StgClosure *result;
TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
- result = s -> current_value;
+ result = SEQ_CST_LOAD(&s->current_value);
TRACE("%p : %s", trec, (result == expected) ? "success" : "failure");
return (result == expected);
}
@@ -231,7 +231,7 @@ static void lock_stm(StgTRecHeader *trec) {
static void unlock_stm(StgTRecHeader *trec STG_UNUSED) {
TRACE("%p : unlock_stm()", trec);
ASSERT(smp_locked == trec);
- smp_locked = 0;
+ SEQ_CST_STORE(&smp_locked, 0);
}
static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
@@ -240,7 +240,7 @@ static StgClosure *lock_tvar(Capability *cap STG_UNUSED,
StgClosure *result;
TRACE("%p : lock_tvar(%p)", trec, s);
ASSERT(smp_locked == trec);
- result = s -> current_value;
+ result = SEQ_CST_LOAD(&s->current_value);
return result;
}
@@ -252,8 +252,8 @@ static void *unlock_tvar(Capability *cap,
TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
ASSERT(smp_locked == trec);
if (force_update) {
- StgClosure *old_value = s -> current_value;
- s -> current_value = c;
+ StgClosure *old_value = SEQ_CST_LOAD(&s->current_value);
+ RELEASE_STORE(&s->current_value, c);
dirty_TVAR(cap, s, old_value);
}
}
@@ -265,7 +265,7 @@ static StgBool cond_lock_tvar(Capability *cap STG_UNUSED,
StgClosure *result;
TRACE("%p : cond_lock_tvar(%p, %p)", trec, s, expected);
ASSERT(smp_locked == trec);
- result = s -> current_value;
+ result = SEQ_CST_LOAD(&s->current_value);
TRACE("%p : %d", result ? "success" : "failure");
return (result == expected);
}
@@ -292,9 +292,9 @@ static StgClosure *lock_tvar(Capability *cap,
TRACE("%p : lock_tvar(%p)", trec, s);
do {
do {
- result = s -> current_value;
+ result = SEQ_CST_LOAD(&s->current_value);
} while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info);
- } while (cas((void *)&(s -> current_value),
+ } while (cas((void *) &s->current_value,
(StgWord)result, (StgWord)trec) != (StgWord)result);
@@ -311,8 +311,8 @@ static void unlock_tvar(Capability *cap,
StgClosure *c,
StgBool force_update STG_UNUSED) {
TRACE("%p : unlock_tvar(%p, %p)", trec, s, c);
- ASSERT(s -> current_value == (StgClosure *)trec);
- s -> current_value = c;
+ ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec);
+ RELEASE_STORE(&s->current_value, c);
dirty_TVAR(cap, s, (StgClosure *) trec);
}
@@ -375,7 +375,7 @@ static void unpark_waiters_on(Capability *cap, StgTVar *s) {
StgTVarWatchQueue *trail;
TRACE("unpark_waiters_on tvar=%p", s);
// unblock TSOs in reverse order, to be a bit fairer (#2319)
- for (q = s -> first_watch_queue_entry, trail = q;
+ for (q = SEQ_CST_LOAD(&s->first_watch_queue_entry), trail = q;
q != END_STM_WATCH_QUEUE;
q = q -> next_queue_entry) {
trail = q;
@@ -532,16 +532,16 @@ static void build_watch_queue_entries_for_trec(Capability *cap,
StgTVarWatchQueue *fq;
s = e -> tvar;
TRACE("%p : adding tso=%p to watch queue for tvar=%p", trec, tso, s);
- ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
- NACQ_ASSERT(s -> current_value == e -> expected_value);
- fq = s -> first_watch_queue_entry;
+ ACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec);
+ NACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == e -> expected_value);
+ fq = SEQ_CST_LOAD(&s->first_watch_queue_entry);
q = alloc_stg_tvar_watch_queue(cap, (StgClosure*) tso);
q -> next_queue_entry = fq;
q -> prev_queue_entry = END_STM_WATCH_QUEUE;
if (fq != END_STM_WATCH_QUEUE) {
fq -> prev_queue_entry = q;
}
- s -> first_watch_queue_entry = q;
+ SEQ_CST_STORE(&s->first_watch_queue_entry, q);
e -> new_value = (StgClosure *) q;
dirty_TVAR(cap, s, (StgClosure *) fq); // we modified first_watch_queue_entry
});
@@ -569,7 +569,7 @@ static void remove_watch_queue_entries_for_trec(Capability *cap,
trec,
q -> closure,
s);
- ACQ_ASSERT(s -> current_value == (StgClosure *)trec);
+ ACQ_ASSERT(SEQ_CST_LOAD(&s->current_value) == (StgClosure *)trec);
nq = q -> next_queue_entry;
pq = q -> prev_queue_entry;
if (nq != END_STM_WATCH_QUEUE) {
@@ -578,8 +578,8 @@ static void remove_watch_queue_entries_for_trec(Capability *cap,
if (pq != END_STM_WATCH_QUEUE) {
pq -> next_queue_entry = nq;
} else {
- ASSERT(s -> first_watch_queue_entry == q);
- s -> first_watch_queue_entry = nq;
+ ASSERT(SEQ_CST_LOAD(&s->first_watch_queue_entry) == q);
+ SEQ_CST_STORE(&s->first_watch_queue_entry, nq);
dirty_TVAR(cap, s, (StgClosure *) q); // we modified first_watch_queue_entry
}
free_stg_tvar_watch_queue(cap, q);
@@ -727,7 +727,7 @@ static StgBool entry_is_read_only(TRecEntry *e) {
static StgBool tvar_is_locked(StgTVar *s, StgTRecHeader *h) {
StgClosure *c;
StgBool result;
- c = s -> current_value;
+ c = SEQ_CST_LOAD(&s->current_value);
result = (c == (StgClosure *) h);
return result;
}
@@ -800,13 +800,16 @@ static StgBool validate_and_acquire_ownership (Capability *cap,
ASSERT(config_use_read_phase);
IF_STM_FG_LOCKS({
TRACE("%p : will need to check %p", trec, s);
- if (s -> current_value != e -> expected_value) {
+ // The memory ordering here must ensure that we have two distinct
+ // reads to current_value, with the read from num_updates between
+ // them.
+ if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
TRACE("%p : doesn't match", trec);
result = false;
BREAK_FOR_EACH;
}
- e -> num_updates = s -> num_updates;
- if (s -> current_value != e -> expected_value) {
+ e->num_updates = SEQ_CST_LOAD(&s->num_updates);
+ if (SEQ_CST_LOAD(&s->current_value) != e -> expected_value) {
TRACE("%p : doesn't match (race)", trec);
result = false;
BREAK_FOR_EACH;
@@ -828,7 +831,7 @@ static StgBool validate_and_acquire_ownership (Capability *cap,
// check_read_only : check that we've seen an atomic snapshot of the
// non-updated TVars accessed by a trec. This checks that the last TRec to
// commit an update to the TVar is unchanged since the value was stashed in
-// validate_and_acquire_ownership. If no update is seen to any TVar than
+// validate_and_acquire_ownership. If no update is seen to any TVar then
// all of them contained their expected values at the start of the call to
// check_read_only.
//
@@ -847,11 +850,16 @@ static StgBool check_read_only(StgTRecHeader *trec STG_UNUSED) {
if (entry_is_read_only(e)) {
TRACE("%p : check_read_only for TVar %p, saw %ld", trec, s, e -> num_updates);
+ // We must first load current_value then num_updates; this is inverse of
+ // the order of the stores in stmCommitTransaction.
+ StgClosure *current_value = SEQ_CST_LOAD(&s->current_value);
+ StgInt num_updates = SEQ_CST_LOAD(&s->num_updates);
+
// Note we need both checks and in this order as the TVar could be
// locked by another transaction that is committing but has not yet
// incremented `num_updates` (See #7815).
- if (s -> current_value != e -> expected_value ||
- s -> num_updates != e -> num_updates) {
+ if (current_value != e->expected_value ||
+ num_updates != e->num_updates) {
TRACE("%p : mismatch", trec);
result = false;
BREAK_FOR_EACH;
@@ -887,17 +895,22 @@ void stmPreGCHook (Capability *cap) {
#define TOKEN_BATCH_SIZE 1024
+#if defined(THREADED_RTS)
+
static volatile StgInt64 max_commits = 0;
-#if defined(THREADED_RTS)
static volatile StgWord token_locked = false;
+static StgInt64 getMaxCommits(void) {
+ return RELAXED_LOAD(&max_commits);
+}
+
static void getTokenBatch(Capability *cap) {
while (cas((void *)&token_locked, false, true) == true) { /* nothing */ }
- max_commits += TOKEN_BATCH_SIZE;
- TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, max_commits);
+ NONATOMIC_ADD(&max_commits, TOKEN_BATCH_SIZE);
+ TRACE("%p : cap got token batch, max_commits=%" FMT_Int64, cap, RELAXED_LOAD(&max_commits));
cap -> transaction_tokens = TOKEN_BATCH_SIZE;
- token_locked = false;
+ RELEASE_STORE(&token_locked, false);
}
static void getToken(Capability *cap) {
@@ -907,6 +920,10 @@ static void getToken(Capability *cap) {
cap -> transaction_tokens --;
}
#else
+static StgInt64 getMaxCommits(void) {
+ return 0;
+}
+
static void getToken(Capability *cap STG_UNUSED) {
// Nothing
}
@@ -1062,7 +1079,7 @@ static TRecEntry *get_entry_for(StgTRecHeader *trec, StgTVar *tvar, StgTRecHeade
/*......................................................................*/
StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
- StgInt64 max_commits_at_start = max_commits;
+ StgInt64 max_commits_at_start = getMaxCommits();
TRACE("%p : stmCommitTransaction()", trec);
ASSERT(trec != NO_TREC);
@@ -1088,7 +1105,7 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
result = check_read_only(trec);
TRACE("%p : read-check %s", trec, result ? "succeeded" : "failed");
- max_commits_at_end = max_commits;
+ max_commits_at_end = getMaxCommits();
max_concurrent_commits = ((max_commits_at_end - max_commits_at_start) +
(n_capabilities * TOKEN_BATCH_SIZE));
if (((max_concurrent_commits >> 32) > 0) || shake()) {
@@ -1113,7 +1130,8 @@ StgBool stmCommitTransaction(Capability *cap, StgTRecHeader *trec) {
TRACE("%p : writing %p to %p, waking waiters", trec, e -> new_value, s);
unpark_waiters_on(cap,s);
IF_STM_FG_LOCKS({
- s -> num_updates ++;
+ // We have locked the TVar therefore nonatomic addition is sufficient
+ NONATOMIC_ADD(&s->num_updates, 1);
});
unlock_tvar(cap, trec, s, e -> new_value, true);
}
@@ -1269,12 +1287,12 @@ StgBool stmReWait(Capability *cap, StgTSO *tso) {
static StgClosure *read_current_value(StgTRecHeader *trec STG_UNUSED, StgTVar *tvar) {
StgClosure *result;
- result = tvar -> current_value;
+ result = ACQUIRE_LOAD(&tvar->current_value);
#if defined(STM_FG_LOCKS)
while (GET_INFO(UNTAG_CLOSURE(result)) == &stg_TREC_HEADER_info) {
TRACE("%p : read_current_value(%p) saw %p", trec, tvar, result);
- result = tvar -> current_value;
+ result = ACQUIRE_LOAD(&tvar->current_value);
}
#endif
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 75a6f545ec..b7c1e7b0c0 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -203,6 +203,7 @@ schedule (Capability *initialCapability, Task *task)
// Pre-condition: this task owns initialCapability.
// The sched_mutex is *NOT* held
// NB. on return, we still hold a capability.
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
debugTrace (DEBUG_sched, "cap %d: schedule()", initialCapability->no);
@@ -210,6 +211,7 @@ schedule (Capability *initialCapability, Task *task)
// Scheduler loop starts here:
while (1) {
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
// Check whether we have re-entered the RTS from Haskell without
// going via suspendThread()/resumeThread (i.e. a 'safe' foreign
@@ -258,7 +260,7 @@ schedule (Capability *initialCapability, Task *task)
// * We might be left with threads blocked in foreign calls,
// we should really attempt to kill these somehow (TODO).
- switch (sched_state) {
+ switch (RELAXED_LOAD(&sched_state)) {
case SCHED_RUNNING:
break;
case SCHED_INTERRUPTING:
@@ -270,7 +272,7 @@ schedule (Capability *initialCapability, Task *task)
// other Capability did the final GC, or we did it above,
// either way we can fall through to the SCHED_SHUTTING_DOWN
// case now.
- ASSERT(sched_state == SCHED_SHUTTING_DOWN);
+ ASSERT(RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN);
// fall through
case SCHED_SHUTTING_DOWN:
@@ -372,7 +374,7 @@ schedule (Capability *initialCapability, Task *task)
// killed, kill it now. This sometimes happens when a finalizer
// thread is created by the final GC, or a thread previously
// in a foreign call returns.
- if (sched_state >= SCHED_INTERRUPTING &&
+ if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING &&
!(t->what_next == ThreadComplete || t->what_next == ThreadKilled)) {
deleteThread(t);
}
@@ -403,7 +405,7 @@ schedule (Capability *initialCapability, Task *task)
*/
if (RtsFlags.ConcFlags.ctxtSwitchTicks == 0
&& !emptyThreadQueues(cap)) {
- cap->context_switch = 1;
+ RELAXED_STORE(&cap->context_switch, 1);
}
run_thread:
@@ -430,15 +432,15 @@ run_thread:
#endif
// reset the interrupt flag before running Haskell code
- cap->interrupt = 0;
+ RELAXED_STORE(&cap->interrupt, false);
cap->in_haskell = true;
- cap->idle = 0;
+ RELAXED_STORE(&cap->idle, false);
dirty_TSO(cap,t);
dirty_STACK(cap,t->stackobj);
- switch (recent_activity)
+ switch (SEQ_CST_LOAD(&recent_activity))
{
case ACTIVITY_DONE_GC: {
// ACTIVITY_DONE_GC means we turned off the timer signal to
@@ -459,7 +461,7 @@ run_thread:
// wakeUpRts().
break;
default:
- recent_activity = ACTIVITY_YES;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
}
traceEventRunThread(cap, t);
@@ -652,8 +654,16 @@ shouldYieldCapability (Capability *cap, Task *task, bool didGcLast)
// Capability keeps forcing a GC and the other Capabilities make no
// progress at all.
- return ((pending_sync && !didGcLast) ||
- cap->n_returning_tasks != 0 ||
+ // Note [Data race in shouldYieldCapability]
+ // ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ // We would usually need to hold cap->lock to look at n_returning_tasks but
+ // we don't here since this is just an approximate predicate. We
+ // consequently need to use atomic accesses whenever touching
+ // n_returning_tasks. However, since this is an approximate predicate we can
+ // use a RELAXED ordering.
+
+ return ((RELAXED_LOAD(&pending_sync) && !didGcLast) ||
+ RELAXED_LOAD(&cap->n_returning_tasks) != 0 ||
(!emptyRunQueue(cap) && (task->incall->tso == NULL
? peekRunQueue(cap)->bound != NULL
: peekRunQueue(cap)->bound != task->incall)));
@@ -680,7 +690,7 @@ scheduleYield (Capability **pcap, Task *task)
if (!shouldYieldCapability(cap,task,false) &&
(!emptyRunQueue(cap) ||
!emptyInbox(cap) ||
- sched_state >= SCHED_INTERRUPTING)) {
+ RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING)) {
return;
}
@@ -739,7 +749,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
cap0 = capabilities[i];
if (cap != cap0 && !cap0->disabled && tryGrabCapability(cap0,task)) {
if (!emptyRunQueue(cap0)
- || cap0->n_returning_tasks != 0
+ || RELAXED_LOAD(&cap0->n_returning_tasks) != 0
|| !emptyInbox(cap0)) {
// it already has some work, we just grabbed it at
// the wrong moment. Or maybe it's deadlocked!
@@ -821,7 +831,10 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
appendToRunQueue(free_caps[i],t);
traceEventMigrateThread (cap, t, free_caps[i]->no);
- if (t->bound) { t->bound->task->cap = free_caps[i]; }
+ // See Note [Benign data race due to work-pushing].
+ if (t->bound) {
+ t->bound->task->cap = free_caps[i];
+ }
t->cap = free_caps[i];
n--; // we have one fewer threads now
i++; // move on to the next free_cap
@@ -926,7 +939,7 @@ scheduleDetectDeadlock (Capability **pcap, Task *task)
* we won't eagerly start a full GC just because we don't have
* any threads to run currently.
*/
- if (recent_activity != ACTIVITY_INACTIVE) return;
+ if (SEQ_CST_LOAD(&recent_activity) != ACTIVITY_INACTIVE) return;
#endif
if (task->incall->tso && task->incall->tso->why_blocked == BlockedOnIOCompletion) return;
@@ -1127,12 +1140,12 @@ schedulePostRunThread (Capability *cap, StgTSO *t)
static bool
scheduleHandleHeapOverflow( Capability *cap, StgTSO *t )
{
- if (cap->r.rHpLim == NULL || cap->context_switch) {
+ if (cap->r.rHpLim == NULL || RELAXED_LOAD(&cap->context_switch)) {
// Sometimes we miss a context switch, e.g. when calling
// primitives in a tight loop, MAYBE_GC() doesn't check the
// context switch flag, and we end up waiting for a GC.
// See #1984, and concurrent/should_run/1984
- cap->context_switch = 0;
+ RELAXED_STORE(&cap->context_switch, 0);
appendToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
@@ -1233,8 +1246,8 @@ scheduleHandleYield( Capability *cap, StgTSO *t, uint32_t prev_what_next )
// the CPU because the tick always arrives during GC). This way
// penalises threads that do a lot of allocation, but that seems
// better than the alternative.
- if (cap->context_switch != 0) {
- cap->context_switch = 0;
+ if (RELAXED_LOAD(&cap->context_switch) != 0) {
+ RELAXED_STORE(&cap->context_switch, 0);
appendToRunQueue(cap,t);
} else {
pushOnRunQueue(cap,t);
@@ -1333,7 +1346,7 @@ scheduleHandleThreadFinished (Capability *cap, Task *task, StgTSO *t)
if (task->incall->ret) {
*(task->incall->ret) = NULL;
}
- if (sched_state >= SCHED_INTERRUPTING) {
+ if (RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING) {
if (heap_overflow) {
task->incall->rstat = HeapExhausted;
} else {
@@ -1493,7 +1506,7 @@ static bool requestSync
sync->type);
ASSERT(*pcap);
yieldCapability(pcap,task,true);
- sync = pending_sync;
+ sync = SEQ_CST_LOAD(&pending_sync);
} while (sync != NULL);
}
@@ -1524,7 +1537,7 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
Capability *tmpcap;
uint32_t i;
- ASSERT(pending_sync != NULL);
+ ASSERT(SEQ_CST_LOAD(&pending_sync) != NULL);
for (i=0; i < n_capabilities; i++) {
debugTrace(DEBUG_sched, "grabbing all the capabilies (%d/%d)",
i, n_capabilities);
@@ -1596,7 +1609,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
// cycle.
#endif
- if (sched_state == SCHED_SHUTTING_DOWN) {
+ if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
// The final GC has already been done, and the system is
// shutting down. We'll probably deadlock if we try to GC
// now.
@@ -1611,7 +1624,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
major_gc = (collect_gen == RtsFlags.GcFlags.generations-1);
#if defined(THREADED_RTS)
- if (sched_state < SCHED_INTERRUPTING
+ if (RELAXED_LOAD(&sched_state) < SCHED_INTERRUPTING
&& RtsFlags.ParFlags.parGcEnabled
&& collect_gen >= RtsFlags.ParFlags.parGcGen
&& ! oldest_gen->mark)
@@ -1704,7 +1717,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
}
if (was_syncing &&
(prev_sync == SYNC_GC_SEQ || prev_sync == SYNC_GC_PAR) &&
- !(sched_state == SCHED_INTERRUPTING && force_major)) {
+ !(RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && force_major)) {
// someone else had a pending sync request for a GC, so
// let's assume GC has been done and we don't need to GC
// again.
@@ -1712,7 +1725,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
// need to do the final GC.
return;
}
- if (sched_state == SCHED_SHUTTING_DOWN) {
+ if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
// The scheduler might now be shutting down. We tested
// this above, but it might have become true since then as
// we yielded the capability in requestSync().
@@ -1795,7 +1808,7 @@ scheduleDoGC (Capability **pcap, Task *task USED_IF_THREADS,
debugTrace(DEBUG_sched, "%d idle caps", n_idle_caps);
for (i=0; i < n_capabilities; i++) {
- capabilities[i]->idle++;
+ NONATOMIC_ADD(&capabilities[i]->idle, 1);
}
// For all capabilities participating in this GC, wait until
@@ -1817,7 +1830,7 @@ delete_threads_and_gc:
* threads in the system.
* Checking for major_gc ensures that the last GC is major.
*/
- if (sched_state == SCHED_INTERRUPTING && major_gc) {
+ if (RELAXED_LOAD(&sched_state) == SCHED_INTERRUPTING && major_gc) {
deleteAllThreads();
#if defined(THREADED_RTS)
// Discard all the sparks from every Capability. Why?
@@ -1831,7 +1844,7 @@ delete_threads_and_gc:
discardSparksCap(capabilities[i]);
}
#endif
- sched_state = SCHED_SHUTTING_DOWN;
+ RELAXED_STORE(&sched_state, SCHED_SHUTTING_DOWN);
}
/*
@@ -1876,20 +1889,20 @@ delete_threads_and_gc:
#endif
// If we're shutting down, don't leave any idle GC work to do.
- if (sched_state == SCHED_SHUTTING_DOWN) {
+ if (RELAXED_LOAD(&sched_state) == SCHED_SHUTTING_DOWN) {
doIdleGCWork(cap, true /* all of it */);
}
traceSparkCounters(cap);
- switch (recent_activity) {
+ switch (SEQ_CST_LOAD(&recent_activity)) {
case ACTIVITY_INACTIVE:
if (force_major) {
// We are doing a GC because the system has been idle for a
// timeslice and we need to check for deadlock. Record the
// fact that we've done a GC and turn off the timer signal;
// it will get re-enabled if we run any threads after the GC.
- recent_activity = ACTIVITY_DONE_GC;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_DONE_GC);
#if !defined(PROFILING)
stopTimer();
#endif
@@ -1901,7 +1914,7 @@ delete_threads_and_gc:
// the GC might have taken long enough for the timer to set
// recent_activity = ACTIVITY_MAYBE_NO or ACTIVITY_INACTIVE,
// but we aren't necessarily deadlocked:
- recent_activity = ACTIVITY_YES;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
break;
case ACTIVITY_DONE_GC:
@@ -1951,7 +1964,7 @@ delete_threads_and_gc:
releaseGCThreads(cap, idle_cap);
}
#endif
- if (heap_overflow && sched_state == SCHED_RUNNING) {
+ if (heap_overflow && RELAXED_LOAD(&sched_state) == SCHED_RUNNING) {
// GC set the heap_overflow flag. We should throw an exception if we
// can, or shut down otherwise.
@@ -1963,7 +1976,7 @@ delete_threads_and_gc:
// shutdown now. Ultimately we want the main thread to return to
// its caller with HeapExhausted, at which point the caller should
// call hs_exit(). The first step is to delete all the threads.
- sched_state = SCHED_INTERRUPTING;
+ RELAXED_STORE(&sched_state, SCHED_INTERRUPTING);
goto delete_threads_and_gc;
}
@@ -2042,12 +2055,14 @@ forkProcess(HsStablePtr *entry
ACQUIRE_LOCK(&sm_mutex);
ACQUIRE_LOCK(&stable_ptr_mutex);
ACQUIRE_LOCK(&stable_name_mutex);
- ACQUIRE_LOCK(&task->lock);
for (i=0; i < n_capabilities; i++) {
ACQUIRE_LOCK(&capabilities[i]->lock);
}
+ // Take task lock after capability lock to avoid order inversion (#17275).
+ ACQUIRE_LOCK(&task->lock);
+
#if defined(THREADED_RTS)
ACQUIRE_LOCK(&all_tasks_mutex);
#endif
@@ -2251,6 +2266,12 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
cap = rts_lock();
task = cap->running_task;
+
+ // N.B. We must stop the interval timer while we are changing the
+ // capabilities array lest handle_tick may try to context switch
+ // an old capability. See #17289.
+ stopTimer();
+
stopAllCapabilities(&cap, task);
if (new_n_capabilities < enabled_capabilities)
@@ -2333,6 +2354,8 @@ setNumCapabilities (uint32_t new_n_capabilities USED_IF_THREADS)
// Notify IO manager that the number of capabilities has changed.
rts_evalIO(&cap, ioManagerCapabilitiesChanged_closure, NULL);
+ startTimer();
+
rts_unlock(cap);
#endif // THREADED_RTS
@@ -2643,8 +2666,9 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability **pcap)
#if defined(THREADED_RTS)
void scheduleWorker (Capability *cap, Task *task)
{
- // schedule() runs without a lock.
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
cap = schedule(cap,task);
+ ASSERT_FULL_CAPABILITY_INVARIANTS(cap, task);
// On exit from schedule(), we have a Capability, but possibly not
// the same one we started with.
@@ -2704,7 +2728,7 @@ initScheduler(void)
#endif
sched_state = SCHED_RUNNING;
- recent_activity = ACTIVITY_YES;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_YES);
/* Initialise the mutex and condition variables used by
@@ -2746,8 +2770,8 @@ exitScheduler (bool wait_foreign USED_IF_THREADS)
Task *task = newBoundTask();
// If we haven't killed all the threads yet, do it now.
- if (sched_state < SCHED_SHUTTING_DOWN) {
- sched_state = SCHED_INTERRUPTING;
+ if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN) {
+ RELAXED_STORE(&sched_state, SCHED_INTERRUPTING);
nonmovingStop();
Capability *cap = task->cap;
waitForCapability(&cap,task);
diff --git a/rts/Schedule.h b/rts/Schedule.h
index a550a6763a..4c692842e7 100644
--- a/rts/Schedule.h
+++ b/rts/Schedule.h
@@ -184,10 +184,13 @@ popRunQueue (Capability *cap)
StgTSO *t = cap->run_queue_hd;
ASSERT(t != END_TSO_QUEUE);
cap->run_queue_hd = t->_link;
- if (t->_link != END_TSO_QUEUE) {
- t->_link->block_info.prev = END_TSO_QUEUE;
+
+ StgTSO *link = RELAXED_LOAD(&t->_link);
+ if (link != END_TSO_QUEUE) {
+ link->block_info.prev = END_TSO_QUEUE;
}
- t->_link = END_TSO_QUEUE; // no write barrier req'd
+ RELAXED_STORE(&t->_link, END_TSO_QUEUE); // no write barrier req'd
+
if (cap->run_queue_hd == END_TSO_QUEUE) {
cap->run_queue_tl = END_TSO_QUEUE;
}
@@ -230,12 +233,18 @@ emptyQueue (StgTSO *q)
INLINE_HEADER bool
emptyRunQueue(Capability *cap)
{
+ // Can only be called by the task owning the capability.
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->n_run_queue, "emptyRunQueue");
return cap->n_run_queue == 0;
}
INLINE_HEADER void
truncateRunQueue(Capability *cap)
{
+ // Can only be called by the task owning the capability.
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->run_queue_hd, "truncateRunQueue");
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->run_queue_tl, "truncateRunQueue");
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->n_run_queue, "truncateRunQueue");
cap->run_queue_hd = END_TSO_QUEUE;
cap->run_queue_tl = END_TSO_QUEUE;
cap->n_run_queue = 0;
diff --git a/rts/Sparks.c b/rts/Sparks.c
index 2012b0682b..47cf310188 100644
--- a/rts/Sparks.c
+++ b/rts/Sparks.c
@@ -92,7 +92,7 @@ pruneSparkQueue (bool nonmovingMarkFinished, Capability *cap)
SparkPool *pool;
StgClosurePtr spark, tmp, *elements;
uint32_t n, pruned_sparks; // stats only
- StgWord botInd,oldBotInd,currInd; // indices in array (always < size)
+ StgInt botInd,oldBotInd,currInd; // indices in array (always < size)
const StgInfoTable *info;
n = 0;
@@ -111,7 +111,6 @@ pruneSparkQueue (bool nonmovingMarkFinished, Capability *cap)
// stealing is happening during GC.
pool->bottom -= pool->top & ~pool->moduloSize;
pool->top &= pool->moduloSize;
- pool->topBound = pool->top;
debugTrace(DEBUG_sparks,
"markSparkQueue: current spark queue len=%ld; (hd=%ld; tl=%ld)",
@@ -259,7 +258,6 @@ pruneSparkQueue (bool nonmovingMarkFinished, Capability *cap)
ASSERT(currInd == oldBotInd);
pool->top = oldBotInd; // where we started writing
- pool->topBound = pool->top;
pool->bottom = (oldBotInd <= botInd) ? botInd : (botInd + pool->size);
// first free place we did not use (corrected by wraparound)
diff --git a/rts/SpinLock.c b/rts/SpinLock.c
new file mode 100644
index 0000000000..5289694aa7
--- /dev/null
+++ b/rts/SpinLock.c
@@ -0,0 +1,41 @@
+/* ----------------------------------------------------------------------------
+ *
+ * (c) The GHC Team, 2006-2009
+ *
+ * Spin locks
+ *
+ * These are simple spin-only locks as opposed to Mutexes which
+ * probably spin for a while before blocking in the kernel. We use
+ * these when we are sure that all our threads are actively running on
+ * a CPU, eg. in the GC.
+ *
+ * TODO: measure whether we really need these, or whether Mutexes
+ * would do (and be a bit safer if a CPU becomes loaded).
+ *
+ * Do not #include this file directly: #include "Rts.h" instead.
+ *
+ * To understand the structure of the RTS headers, see the wiki:
+ * https://gitlab.haskell.org/ghc/ghc/wikis/commentary/source-tree/includes
+ *
+ * -------------------------------------------------------------------------- */
+
+#include "PosixSource.h"
+#include "Rts.h"
+
+#if defined(THREADED_RTS)
+
+void acquire_spin_lock_slow_path(SpinLock * p)
+{
+ do {
+ for (uint32_t i = 0; i < SPIN_COUNT; i++) {
+ StgWord32 r = cas((StgVolatilePtr)&(p->lock), 1, 0);
+ if (r != 0) return;
+ IF_PROF_SPIN(RELAXED_ADD(&p->spin, 1));
+ busy_wait_nop();
+ }
+ IF_PROF_SPIN(RELAXED_ADD(&p->yield, 1));
+ yieldThread();
+ } while (1);
+}
+
+#endif
diff --git a/rts/StablePtr.c b/rts/StablePtr.c
index edcd863183..469a17a5b9 100644
--- a/rts/StablePtr.c
+++ b/rts/StablePtr.c
@@ -191,9 +191,10 @@ enlargeStablePtrTable(void)
/* When using the threaded RTS, the update of stable_ptr_table is assumed to
* be atomic, so that another thread simultaneously dereferencing a stable
- * pointer will always read a valid address.
+ * pointer will always read a valid address. Release ordering to ensure
+ * that the new table is visible to others.
*/
- stable_ptr_table = new_stable_ptr_table;
+ RELEASE_STORE(&stable_ptr_table, new_stable_ptr_table);
initSpEntryFreeList(stable_ptr_table + old_SPT_size, old_SPT_size, NULL);
}
@@ -247,7 +248,7 @@ exitStablePtrTable(void)
STATIC_INLINE void
freeSpEntry(spEntry *sp)
{
- sp->addr = (P_)stable_ptr_free;
+ RELAXED_STORE(&sp->addr, (P_)stable_ptr_free);
stable_ptr_free = sp;
}
@@ -279,7 +280,7 @@ getStablePtr(StgPtr p)
if (!stable_ptr_free) enlargeStablePtrTable();
sp = stable_ptr_free - stable_ptr_table;
stable_ptr_free = (spEntry*)(stable_ptr_free->addr);
- stable_ptr_table[sp].addr = p;
+ RELAXED_STORE(&stable_ptr_table[sp].addr, p);
stablePtrUnlock();
return (StgStablePtr)(sp);
}
diff --git a/rts/Stats.c b/rts/Stats.c
index 80fca509ad..71dcf8a9d0 100644
--- a/rts/Stats.c
+++ b/rts/Stats.c
@@ -26,6 +26,11 @@
#include <string.h> // for memset
+#if defined(THREADED_RTS)
+// Protects all statistics below
+Mutex stats_mutex;
+#endif
+
static Time
start_init_cpu, start_init_elapsed,
end_init_cpu, end_init_elapsed,
@@ -81,25 +86,6 @@ Time stat_getElapsedTime(void)
Measure the current MUT time, for profiling
------------------------------------------------------------------------ */
-double
-mut_user_time_until( Time t )
-{
- return TimeToSecondsDbl(t - stats.gc_cpu_ns - stats.nonmoving_gc_cpu_ns);
- // heapCensus() time is included in GC_tot_cpu, so we don't need
- // to subtract it here.
-
- // TODO: This seems wrong to me. Surely we should be subtracting
- // (at least) start_init_cpu?
-}
-
-double
-mut_user_time( void )
-{
- Time cpu;
- cpu = getProcessCPUTime();
- return mut_user_time_until(cpu);
-}
-
#if defined(PROFILING)
/*
mut_user_time_during_RP() returns the MUT time during retainer profiling.
@@ -120,6 +106,10 @@ mut_user_time_during_RP( void )
void
initStats0(void)
{
+#if defined(THREADED_RTS)
+ initMutex(&stats_mutex);
+#endif
+
start_init_cpu = 0;
start_init_elapsed = 0;
end_init_cpu = 0;
@@ -281,9 +271,11 @@ stat_endInit(void)
void
stat_startExit(void)
{
+ ACQUIRE_LOCK(&stats_mutex);
getProcessTimes(&start_exit_cpu, &start_exit_elapsed);
start_exit_gc_elapsed = stats.gc_elapsed_ns;
start_exit_gc_cpu = stats.gc_cpu_ns;
+ RELEASE_LOCK(&stats_mutex);
}
/* -----------------------------------------------------------------------------
@@ -294,7 +286,9 @@ stat_startExit(void)
void
stat_endExit(void)
{
+ ACQUIRE_LOCK(&stats_mutex);
getProcessTimes(&end_exit_cpu, &end_exit_elapsed);
+ RELEASE_LOCK(&stats_mutex);
}
void
@@ -306,8 +300,10 @@ stat_startGCSync (gc_thread *gct)
void
stat_startNonmovingGc ()
{
+ ACQUIRE_LOCK(&stats_mutex);
start_nonmoving_gc_cpu = getCurrentThreadCPUTime();
start_nonmoving_gc_elapsed = getProcessCPUTime();
+ RELEASE_LOCK(&stats_mutex);
}
void
@@ -315,6 +311,8 @@ stat_endNonmovingGc ()
{
Time cpu = getCurrentThreadCPUTime();
Time elapsed = getProcessCPUTime();
+
+ ACQUIRE_LOCK(&stats_mutex);
stats.gc.nonmoving_gc_elapsed_ns = elapsed - start_nonmoving_gc_elapsed;
stats.nonmoving_gc_elapsed_ns += stats.gc.nonmoving_gc_elapsed_ns;
@@ -324,12 +322,15 @@ stat_endNonmovingGc ()
stats.nonmoving_gc_max_elapsed_ns =
stg_max(stats.gc.nonmoving_gc_elapsed_ns,
stats.nonmoving_gc_max_elapsed_ns);
+ RELEASE_LOCK(&stats_mutex);
}
void
stat_startNonmovingGcSync ()
{
+ ACQUIRE_LOCK(&stats_mutex);
start_nonmoving_gc_sync_elapsed = getProcessElapsedTime();
+ RELEASE_LOCK(&stats_mutex);
traceConcSyncBegin();
}
@@ -337,13 +338,17 @@ void
stat_endNonmovingGcSync ()
{
Time end_elapsed = getProcessElapsedTime();
+ ACQUIRE_LOCK(&stats_mutex);
stats.gc.nonmoving_gc_sync_elapsed_ns = end_elapsed - start_nonmoving_gc_sync_elapsed;
stats.nonmoving_gc_sync_elapsed_ns += stats.gc.nonmoving_gc_sync_elapsed_ns;
stats.nonmoving_gc_sync_max_elapsed_ns =
stg_max(stats.gc.nonmoving_gc_sync_elapsed_ns,
stats.nonmoving_gc_sync_max_elapsed_ns);
+ Time sync_elapsed = stats.gc.nonmoving_gc_sync_elapsed_ns;
+ RELEASE_LOCK(&stats_mutex);
+
if (RtsFlags.GcFlags.giveStats == VERBOSE_GC_STATS) {
- statsPrintf("# sync %6.3f\n", TimeToSecondsDbl(stats.gc.nonmoving_gc_sync_elapsed_ns));
+ statsPrintf("# sync %6.3f\n", TimeToSecondsDbl(sync_elapsed));
}
traceConcSyncEnd();
}
@@ -459,6 +464,8 @@ stat_endGC (Capability *cap, gc_thread *initiating_gct, W_ live, W_ copied, W_ s
W_ mut_spin_spin, W_ mut_spin_yield, W_ any_work, W_ no_work,
W_ scav_find_work)
{
+ ACQUIRE_LOCK(&stats_mutex);
+
// -------------------------------------------------
// Collect all the stats about this GC in stats.gc. We always do this since
// it's relatively cheap and we need allocated_bytes to catch heap
@@ -628,6 +635,7 @@ stat_endGC (Capability *cap, gc_thread *initiating_gct, W_ live, W_ copied, W_ s
CAPSET_HEAP_DEFAULT,
mblocks_allocated * MBLOCK_SIZE);
}
+ RELEASE_LOCK(&stats_mutex);
}
/* -----------------------------------------------------------------------------
@@ -640,8 +648,10 @@ stat_startRP(void)
Time user, elapsed;
getProcessTimes( &user, &elapsed );
+ ACQUIRE_LOCK(&stats_mutex);
RP_start_time = user;
RPe_start_time = elapsed;
+ RELEASE_LOCK(&stats_mutex);
}
#endif /* PROFILING */
@@ -659,11 +669,14 @@ stat_endRP(
Time user, elapsed;
getProcessTimes( &user, &elapsed );
+ ACQUIRE_LOCK(&stats_mutex);
RP_tot_time += user - RP_start_time;
RPe_tot_time += elapsed - RPe_start_time;
+ double mut_time_during_RP = mut_user_time_during_RP();
+ RELEASE_LOCK(&stats_mutex);
fprintf(prof_file, "Retainer Profiling: %d, at %f seconds\n",
- retainerGeneration, mut_user_time_during_RP());
+ retainerGeneration, mut_time_during_RP);
fprintf(prof_file, "\tMax auxiliary stack size = %u\n", maxStackSize);
fprintf(prof_file, "\tAverage number of visits per object = %f\n",
averageNumVisit);
@@ -680,8 +693,10 @@ stat_startHeapCensus(void)
Time user, elapsed;
getProcessTimes( &user, &elapsed );
+ ACQUIRE_LOCK(&stats_mutex);
HC_start_time = user;
HCe_start_time = elapsed;
+ RELEASE_LOCK(&stats_mutex);
}
#endif /* PROFILING */
@@ -695,8 +710,10 @@ stat_endHeapCensus(void)
Time user, elapsed;
getProcessTimes( &user, &elapsed );
+ ACQUIRE_LOCK(&stats_mutex);
HC_tot_time += user - HC_start_time;
HCe_tot_time += elapsed - HCe_start_time;
+ RELEASE_LOCK(&stats_mutex);
}
#endif /* PROFILING */
@@ -793,6 +810,7 @@ static void free_RTSSummaryStats(RTSSummaryStats * sum)
sum->gc_summary_stats = NULL;
}
+// Must hold stats_mutex.
static void report_summary(const RTSSummaryStats* sum)
{
// We should do no calculation, other than unit changes and formatting, and
@@ -1195,6 +1213,7 @@ static void report_machine_readable (const RTSSummaryStats * sum)
statsPrintf(" ]\n");
}
+// Must hold stats_mutex.
static void report_one_line(const RTSSummaryStats * sum)
{
// We should do no calculation, other than unit changes and formatting, and
@@ -1226,10 +1245,12 @@ static void report_one_line(const RTSSummaryStats * sum)
}
void
-stat_exit (void)
+stat_exitReport (void)
{
RTSSummaryStats sum;
init_RTSSummaryStats(&sum);
+ // We'll need to refer to task counters later
+ ACQUIRE_LOCK(&all_tasks_mutex);
if (RtsFlags.GcFlags.giveStats != NO_GC_STATS) {
// First we tidy the times in stats, and populate the times in sum.
@@ -1239,6 +1260,7 @@ stat_exit (void)
Time now_cpu_ns, now_elapsed_ns;
getProcessTimes(&now_cpu_ns, &now_elapsed_ns);
+ ACQUIRE_LOCK(&stats_mutex);
stats.cpu_ns = now_cpu_ns - start_init_cpu;
stats.elapsed_ns = now_elapsed_ns - start_init_elapsed;
/* avoid divide by zero if stats.total_cpu_ns is measured as 0.00
@@ -1427,6 +1449,7 @@ stat_exit (void)
report_one_line(&sum);
}
}
+ RELEASE_LOCK(&stats_mutex);
statsFlush();
statsClose();
@@ -1446,6 +1469,15 @@ stat_exit (void)
stgFree(GC_coll_max_pause);
GC_coll_max_pause = NULL;
}
+
+ RELEASE_LOCK(&all_tasks_mutex);
+}
+
+void stat_exit()
+{
+#if defined(THREADED_RTS)
+ closeMutex(&stats_mutex);
+#endif
}
/* Note [Work Balance]
@@ -1665,7 +1697,10 @@ statDescribeGens(void)
uint64_t getAllocations( void )
{
- return stats.allocated_bytes;
+ ACQUIRE_LOCK(&stats_mutex);
+ StgWord64 n = stats.allocated_bytes;
+ RELEASE_LOCK(&stats_mutex);
+ return n;
}
int getRTSStatsEnabled( void )
@@ -1678,7 +1713,9 @@ void getRTSStats( RTSStats *s )
Time current_elapsed = 0;
Time current_cpu = 0;
+ ACQUIRE_LOCK(&stats_mutex);
*s = stats;
+ RELEASE_LOCK(&stats_mutex);
getProcessTimes(&current_cpu, &current_elapsed);
s->cpu_ns = current_cpu - end_init_cpu;
diff --git a/rts/Stats.h b/rts/Stats.h
index f5b8ce9991..9d62acef37 100644
--- a/rts/Stats.h
+++ b/rts/Stats.h
@@ -58,6 +58,7 @@ void stat_endHeapCensus(void);
void stat_startExit(void);
void stat_endExit(void);
+void stat_exitReport(void);
void stat_exit(void);
void stat_workerStop(void);
@@ -65,9 +66,6 @@ void initStats0(void);
void initStats1(void);
void resetChildProcessStats(void);
-double mut_user_time_until(Time t);
-double mut_user_time(void);
-
void statDescribeGens( void );
Time stat_getElapsedGCTime(void);
diff --git a/rts/Task.c b/rts/Task.c
index e5963dccc6..2bd32359cc 100644
--- a/rts/Task.c
+++ b/rts/Task.c
@@ -30,6 +30,7 @@
Task *all_tasks = NULL;
// current number of bound tasks + total number of worker tasks.
+// Locks required: all_tasks_mutex.
uint32_t taskCount;
uint32_t workerCount;
uint32_t currentWorkerCount;
@@ -237,6 +238,8 @@ newTask (bool worker)
all_tasks = task;
taskCount++;
+ debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
+
if (worker) {
workerCount++;
currentWorkerCount++;
@@ -311,8 +314,6 @@ newBoundTask (void)
task->stopped = false;
newInCall(task);
-
- debugTrace(DEBUG_sched, "new task (taskCount: %d)", taskCount);
return task;
}
@@ -472,7 +473,7 @@ startWorkerTask (Capability *cap)
// else get in, because the new worker Task has nowhere to go to
// sleep so that it could be woken up again.
ASSERT_LOCK_HELD(&cap->lock);
- cap->running_task = task;
+ RELAXED_STORE(&cap->running_task, task);
// Set the name of the worker thread to the original process name followed by
// ":w", but only if we're on Linux where the program_invocation_short_name
diff --git a/rts/Task.h b/rts/Task.h
index 6e366a5d9b..9b6a8e8d7b 100644
--- a/rts/Task.h
+++ b/rts/Task.h
@@ -16,8 +16,8 @@
#include "BeginPrivate.h"
/*
- Definition of a Task
- --------------------
+ Note [Definition of a Task]
+ ~~~~~~~~~~~~~~~~~~~~~~~~~~~
A task is an OSThread that runs Haskell code. Every OSThread that
runs inside the RTS, whether as a worker created by the RTS or via
@@ -33,8 +33,8 @@
Haskell code simultaneously. A task relinquishes its Capability
when it is asked to evaluate an external (C) call.
- Ownership of Task
- -----------------
+ Note [Ownership of Task]
+ ~~~~~~~~~~~~~~~~~~~~~~~~
Task ownership is a little tricky. The default situation is that
the Task is an OS-thread-local structure that is owned by the OS
diff --git a/rts/ThreadPaused.c b/rts/ThreadPaused.c
index 83c621e386..13fc2b4ca0 100644
--- a/rts/ThreadPaused.c
+++ b/rts/ThreadPaused.c
@@ -243,7 +243,7 @@ threadPaused(Capability *cap, StgTSO *tso)
SET_INFO(frame, (StgInfoTable *)&stg_marked_upd_frame_info);
bh = ((StgUpdateFrame *)frame)->updatee;
- bh_info = bh->header.info;
+ bh_info = ACQUIRE_LOAD(&bh->header.info);
IF_NONMOVING_WRITE_BARRIER_ENABLED {
updateRemembSetPushClosure(cap, (StgClosure *) bh);
}
@@ -287,7 +287,7 @@ threadPaused(Capability *cap, StgTSO *tso)
// suspended by this mechanism. See Note [AP_STACKs must be eagerly
// blackholed] for details.
if (((bh_info == &stg_BLACKHOLE_info)
- && ((StgInd*)bh)->indirectee != (StgClosure*)tso)
+ && (RELAXED_LOAD(&((StgInd*)bh)->indirectee) != (StgClosure*)tso))
|| (bh_info == &stg_WHITEHOLE_info))
{
debugTrace(DEBUG_squeeze,
@@ -331,7 +331,7 @@ threadPaused(Capability *cap, StgTSO *tso)
if (cur_bh_info != bh_info) {
bh_info = cur_bh_info;
#if defined(PROF_SPIN)
- ++whitehole_threadPaused_spin;
+ NONATOMIC_ADD(&whitehole_threadPaused_spin, 1);
#endif
busy_wait_nop();
goto retry;
@@ -351,9 +351,8 @@ threadPaused(Capability *cap, StgTSO *tso)
}
// The payload of the BLACKHOLE points to the TSO
- ((StgInd *)bh)->indirectee = (StgClosure *)tso;
- write_barrier();
- SET_INFO(bh,&stg_BLACKHOLE_info);
+ RELAXED_STORE(&((StgInd *)bh)->indirectee, (StgClosure *)tso);
+ SET_INFO_RELEASE(bh,&stg_BLACKHOLE_info);
// .. and we need a write barrier, since we just mutated the closure:
recordClosureMutated(cap,bh);
diff --git a/rts/Threads.c b/rts/Threads.c
index 54c703963e..6050549d64 100644
--- a/rts/Threads.c
+++ b/rts/Threads.c
@@ -277,8 +277,6 @@ tryWakeupThread (Capability *cap, StgTSO *tso)
msg = (MessageWakeup *)allocate(cap,sizeofW(MessageWakeup));
msg->tso = tso;
SET_HDR(msg, &stg_MSG_TRY_WAKEUP_info, CCS_SYSTEM);
- // Ensure that writes constructing Message are committed before sending.
- write_barrier();
sendMessage(cap, tso->cap, (Message*)msg);
debugTraceCap(DEBUG_sched, cap, "message: try wakeup thread %ld on cap %d",
(W_)tso->id, tso->cap->no);
@@ -355,9 +353,14 @@ unblock:
migrateThread
------------------------------------------------------------------------- */
+// Precondition: The caller must own the `from` capability.
void
migrateThread (Capability *from, StgTSO *tso, Capability *to)
{
+ // Sadly we can't assert this since migrateThread is called from
+ // scheduleDoGC, where we implicly own all capabilities.
+ //ASSERT_FULL_CAPABILITY_INVARIANTS(from, getTask());
+
traceEventMigrateThread (from, tso, to->no);
// ThreadMigrating tells the target cap that it needs to be added to
// the run queue when it receives the MSG_TRY_WAKEUP.
@@ -383,8 +386,7 @@ wakeBlockingQueue(Capability *cap, StgBlockingQueue *bq)
for (msg = bq->queue; msg != (MessageBlackHole*)END_TSO_QUEUE;
msg = msg->link) {
- i = msg->header.info;
- load_load_barrier();
+ i = ACQUIRE_LOAD(&msg->header.info);
if (i != &stg_IND_info) {
ASSERT(i == &stg_MSG_BLACKHOLE_info);
tryWakeupThread(cap,msg->tso);
@@ -414,8 +416,7 @@ checkBlockingQueues (Capability *cap, StgTSO *tso)
for (bq = tso->bq; bq != (StgBlockingQueue*)END_TSO_QUEUE; bq = next) {
next = bq->link;
- const StgInfoTable *bqinfo = bq->header.info;
- load_load_barrier(); // XXX: Is this needed?
+ const StgInfoTable *bqinfo = ACQUIRE_LOAD(&bq->header.info);
if (bqinfo == &stg_IND_info) {
// ToDo: could short it out right here, to avoid
// traversing this IND multiple times.
@@ -423,8 +424,7 @@ checkBlockingQueues (Capability *cap, StgTSO *tso)
}
p = bq->bh;
- const StgInfoTable *pinfo = p->header.info;
- load_load_barrier();
+ const StgInfoTable *pinfo = ACQUIRE_LOAD(&p->header.info);
if (pinfo != &stg_BLACKHOLE_info ||
((StgInd *)p)->indirectee != (StgClosure*)bq)
{
@@ -448,8 +448,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
StgTSO *owner;
const StgInfoTable *i;
- i = thunk->header.info;
- load_load_barrier();
+ i = ACQUIRE_LOAD(&thunk->header.info);
if (i != &stg_BLACKHOLE_info &&
i != &stg_CAF_BLACKHOLE_info &&
i != &__stg_EAGER_BLACKHOLE_info &&
@@ -469,8 +468,7 @@ updateThunk (Capability *cap, StgTSO *tso, StgClosure *thunk, StgClosure *val)
return;
}
- i = v->header.info;
- load_load_barrier();
+ i = ACQUIRE_LOAD(&v->header.info);
if (i == &stg_TSO_info) {
checkBlockingQueues(cap, tso);
return;
@@ -796,8 +794,7 @@ loop:
return true;
}
- qinfo = q->header.info;
- load_load_barrier();
+ qinfo = ACQUIRE_LOAD(&q->header.info);
if (qinfo == &stg_IND_info ||
qinfo == &stg_MSG_NULL_info) {
q = (StgMVarTSOQueue*)((StgInd*)q)->indirectee;
@@ -814,15 +811,15 @@ loop:
ASSERT(tso->block_info.closure == (StgClosure*)mvar);
// save why_blocked here, because waking up the thread destroys
// this information
- StgWord why_blocked = tso->why_blocked;
+ StgWord why_blocked = RELAXED_LOAD(&tso->why_blocked);
// actually perform the takeMVar
StgStack* stack = tso->stackobj;
- stack->sp[1] = (W_)value;
- stack->sp[0] = (W_)&stg_ret_p_info;
+ RELAXED_STORE(&stack->sp[1], (W_)value);
+ RELAXED_STORE(&stack->sp[0], (W_)&stg_ret_p_info);
// indicate that the MVar operation has now completed.
- tso->_link = (StgTSO*)&stg_END_TSO_QUEUE_closure;
+ RELEASE_STORE(&tso->_link, (StgTSO*)&stg_END_TSO_QUEUE_closure);
if ((stack->dirty & STACK_DIRTY) == 0) {
dirty_STACK(cap, stack);
diff --git a/rts/Timer.c b/rts/Timer.c
index 057cffc7c9..97d87ad989 100644
--- a/rts/Timer.c
+++ b/rts/Timer.c
@@ -25,6 +25,15 @@
#include "Capability.h"
#include "RtsSignals.h"
+// This global counter is used to allow multiple threads to stop the
+// timer temporarily with a stopTimer()/startTimer() pair. If
+// timer_enabled == 0 timer is enabled
+// timer_disabled == N, N > 0 timer is disabled by N threads
+// When timer_enabled makes a transition to 0, we enable the timer,
+// and when it makes a transition to non-0 we disable it.
+
+static StgWord timer_disabled;
+
/* ticks left before next pre-emptive context switch */
static int ticks_to_ctxt_switch = 0;
@@ -92,7 +101,9 @@ void
handle_tick(int unused STG_UNUSED)
{
handleProfTick();
- if (RtsFlags.ConcFlags.ctxtSwitchTicks > 0) {
+ if (RtsFlags.ConcFlags.ctxtSwitchTicks > 0
+ && SEQ_CST_LOAD(&timer_disabled) == 0)
+ {
ticks_to_ctxt_switch--;
if (ticks_to_ctxt_switch <= 0) {
ticks_to_ctxt_switch = RtsFlags.ConcFlags.ctxtSwitchTicks;
@@ -106,16 +117,16 @@ handle_tick(int unused STG_UNUSED)
* for threads that are deadlocked. However, ensure we wait
* at least interIdleGCWait (+RTS -Iw) between idle GCs.
*/
- switch (recent_activity) {
+ switch (SEQ_CST_LOAD(&recent_activity)) {
case ACTIVITY_YES:
- recent_activity = ACTIVITY_MAYBE_NO;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_MAYBE_NO);
idle_ticks_to_gc = RtsFlags.GcFlags.idleGCDelayTime /
RtsFlags.MiscFlags.tickInterval;
break;
case ACTIVITY_MAYBE_NO:
if (idle_ticks_to_gc == 0 && inter_gc_ticks_to_gc == 0) {
if (RtsFlags.GcFlags.doIdleGC) {
- recent_activity = ACTIVITY_INACTIVE;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_INACTIVE);
inter_gc_ticks_to_gc = RtsFlags.GcFlags.interIdleGCWait /
RtsFlags.MiscFlags.tickInterval;
#if defined(THREADED_RTS)
@@ -124,7 +135,7 @@ handle_tick(int unused STG_UNUSED)
// the GC.
#endif
} else {
- recent_activity = ACTIVITY_DONE_GC;
+ SEQ_CST_STORE(&recent_activity, ACTIVITY_DONE_GC);
// disable timer signals (see #1623, #5991, #9105)
// but only if we're not profiling (e.g. passed -h or -p RTS
// flags). If we are profiling we need to keep the timer active
@@ -148,15 +159,6 @@ handle_tick(int unused STG_UNUSED)
}
}
-// This global counter is used to allow multiple threads to stop the
-// timer temporarily with a stopTimer()/startTimer() pair. If
-// timer_enabled == 0 timer is enabled
-// timer_disabled == N, N > 0 timer is disabled by N threads
-// When timer_enabled makes a transition to 0, we enable the timer,
-// and when it makes a transition to non-0 we disable it.
-
-static StgWord timer_disabled;
-
void
initTimer(void)
{
@@ -164,7 +166,7 @@ initTimer(void)
if (RtsFlags.MiscFlags.tickInterval != 0) {
initTicker(RtsFlags.MiscFlags.tickInterval, handle_tick);
}
- timer_disabled = 1;
+ SEQ_CST_STORE(&timer_disabled, 1);
}
void
diff --git a/rts/Updates.h b/rts/Updates.h
index 608aaff524..aa5fbe0133 100644
--- a/rts/Updates.h
+++ b/rts/Updates.h
@@ -76,7 +76,6 @@ INLINE_HEADER void updateWithIndirection (Capability *cap,
/* not necessarily true: ASSERT( !closure_IND(p1) ); */
/* occurs in RaiseAsync.c:raiseAsync() */
/* See Note [Heap memory barriers] in SMP.h */
- write_barrier();
bdescr *bd = Bdescr((StgPtr)p1);
if (bd->gen_no != 0) {
IF_NONMOVING_WRITE_BARRIER_ENABLED {
@@ -88,9 +87,8 @@ INLINE_HEADER void updateWithIndirection (Capability *cap,
TICK_UPD_NEW_IND();
}
OVERWRITING_CLOSURE(p1);
- ((StgInd *)p1)->indirectee = p2;
- write_barrier();
- SET_INFO(p1, &stg_BLACKHOLE_info);
+ RELEASE_STORE(&((StgInd *)p1)->indirectee, p2);
+ SET_INFO_RELEASE(p1, &stg_BLACKHOLE_info);
LDV_RECORD_CREATE(p1);
}
diff --git a/rts/WSDeque.c b/rts/WSDeque.c
index 60b8948149..d930d848a4 100644
--- a/rts/WSDeque.c
+++ b/rts/WSDeque.c
@@ -11,7 +11,15 @@
* SPAA'05, July 2005, Las Vegas, USA.
* ACM 1-58113-986-1/05/0007
*
+ * This implementation closely follows the C11 implementation presented in
+ *
+ * N.M. Lê, A. Pop, A.Cohen, and F.Z. Nardelli. "Correct and Efficient
+ * Work-Stealing for Weak Memory Models". PPoPP'13, February 2013,
+ * ACM 978-1-4503-1922/13/02.
+ *
* Author: Jost Berthold MSRC 07-09/2008
+ * Rewritten by: Ben Gamari (Well-Typed)
+ *
*
* The DeQue is held as a circular array with known length. Positions
* of top (read-end) and bottom (write-end) always increase, and the
@@ -44,7 +52,13 @@
#include "RtsUtils.h"
#include "WSDeque.h"
-#define CASTOP(addr,old,new) ((old) == cas(((StgPtr)addr),(old),(new)))
+// Returns true on success.
+static inline bool
+cas_top(WSDeque *q, StgInt old, StgInt new)
+{
+ return (StgWord) old == cas((StgPtr) &q->top, (StgWord) old, (StgWord) new);
+}
+
/* -----------------------------------------------------------------------------
* newWSDeque
@@ -80,13 +94,12 @@ newWSDeque (uint32_t size)
"newWSDeque");
q->elements = stgMallocBytes(realsize * sizeof(StgClosurePtr), /* dataspace */
"newWSDeque:data space");
- q->top=0;
- q->bottom=0;
- q->topBound=0; /* read by writer, updated each time top is read */
-
q->size = realsize; /* power of 2 */
q->moduloSize = realsize - 1; /* n % size == n & moduloSize */
+ q->top=0;
+ RELEASE_STORE(&q->bottom, 0); /* read by writer, updated each time top is read */
+
ASSERT_WSDEQUE_INVARIANTS(q);
return q;
}
@@ -118,56 +131,31 @@ freeWSDeque (WSDeque *q)
void *
popWSDeque (WSDeque *q)
{
- /* also a bit tricky, has to avoid concurrent steal() calls by
- accessing top with cas, when there is only one element left */
- StgWord t, b;
- long currSize;
- void * removed;
-
- ASSERT_WSDEQUE_INVARIANTS(q);
-
- b = q->bottom;
-
- // "decrement b as a test, see what happens"
- b--;
- q->bottom = b;
-
- // very important that the following read of q->top does not occur
- // before the earlier write to q->bottom.
- store_load_barrier();
-
- t = q->top; /* using topBound would give an *upper* bound, we
- need a lower bound. We use the real top here, but
- can update the topBound value */
- q->topBound = t;
- currSize = (long)b - (long)t;
- if (currSize < 0) { /* was empty before decrementing b, set b
- consistently and abort */
- q->bottom = t;
- return NULL;
- }
-
- // read the element at b
- removed = q->elements[b & q->moduloSize];
-
- if (currSize > 0) { /* no danger, still elements in buffer after b-- */
- // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed);
- return removed;
- }
- /* otherwise, has someone meanwhile stolen the same (last) element?
- Check and increment top value to know */
- if ( !(CASTOP(&(q->top),t,t+1)) ) {
- removed = NULL; /* no success, but continue adjusting bottom */
+ StgInt b = RELAXED_LOAD(&q->bottom) - 1;
+ RELAXED_STORE(&q->bottom, b);
+ SEQ_CST_FENCE();
+ StgInt t = RELAXED_LOAD(&q->top);
+
+ void *result;
+ if (t <= b) {
+ /* Non-empty */
+ result = RELAXED_LOAD(&q->elements[b & q->moduloSize]);
+ if (t == b) {
+ /* Single last element in queue */
+ if (!cas_top(q, t, t+1)) {
+ /* Failed race */
+ result = NULL;
+ }
+
+ RELAXED_STORE(&q->bottom, b+1);
+ }
+ } else {
+ /* Empty queue */
+ result = NULL;
+ RELAXED_STORE(&q->bottom, b+1);
}
- q->bottom = t+1; /* anyway, empty now. Adjust bottom consistently. */
- q->topBound = t+1; /* ...and cached top value as well */
- ASSERT_WSDEQUE_INVARIANTS(q);
- ASSERT(q->bottom >= q->top);
-
- // debugBelch("popWSDeque: t=%ld b=%ld = %ld\n", t, b, removed);
-
- return removed;
+ return result;
}
/* -----------------------------------------------------------------------------
@@ -177,43 +165,19 @@ popWSDeque (WSDeque *q)
void *
stealWSDeque_ (WSDeque *q)
{
- void * stolen;
- StgWord b,t;
-
-// Can't do this on someone else's spark pool:
-// ASSERT_WSDEQUE_INVARIANTS(q);
-
- // NB. these loads must be ordered, otherwise there is a race
- // between steal and pop.
- t = q->top;
- load_load_barrier();
- b = q->bottom;
-
- // NB. b and t are unsigned; we need a signed value for the test
- // below, because it is possible that t > b during a
- // concurrent popWSQueue() operation.
- if ((long)b - (long)t <= 0 ) {
- return NULL; /* already looks empty, abort */
+ StgInt t = ACQUIRE_LOAD(&q->top);
+ SEQ_CST_FENCE();
+ StgInt b = ACQUIRE_LOAD(&q->bottom);
+
+ void *result = NULL;
+ if (t < b) {
+ /* Non-empty queue */
+ result = RELAXED_LOAD(&q->elements[t % q->size]);
+ if (!cas_top(q, t, t+1)) {
+ return NULL;
+ }
}
- // NB. the load of q->bottom must be ordered before the load of
- // q->elements[t & q-> moduloSize]. See comment "KG:..." below
- // and Ticket #13633.
- load_load_barrier();
- /* now access array, see pushBottom() */
- stolen = q->elements[t & q->moduloSize];
-
- /* now decide whether we have won */
- if ( !(CASTOP(&(q->top),t,t+1)) ) {
- /* lost the race, someone else has changed top in the meantime */
- return NULL;
- } /* else: OK, top has been incremented by the cas call */
-
- // debugBelch("stealWSDeque_: t=%d b=%d\n", t, b);
-
-// Can't do this on someone else's spark pool:
-// ASSERT_WSDEQUE_INVARIANTS(q);
-
- return stolen;
+ return result;
}
void *
@@ -232,67 +196,30 @@ stealWSDeque (WSDeque *q)
* pushWSQueue
* -------------------------------------------------------------------------- */
-#define DISCARD_NEW
-
-/* enqueue an element. Should always succeed by resizing the array
- (not implemented yet, silently fails in that case). */
+/* Enqueue an element. Must only be called by owner. Returns true if element was
+ * pushed, false if queue is full
+ */
bool
pushWSDeque (WSDeque* q, void * elem)
{
- StgWord t;
- StgWord b;
- StgWord sz = q->moduloSize;
+ StgInt b = ACQUIRE_LOAD(&q->bottom);
+ StgInt t = ACQUIRE_LOAD(&q->top);
- ASSERT_WSDEQUE_INVARIANTS(q);
-
- /* we try to avoid reading q->top (accessed by all) and use
- q->topBound (accessed only by writer) instead.
- This is why we do not just call empty(q) here.
- */
- b = q->bottom;
- t = q->topBound;
- if ( (StgInt)b - (StgInt)t >= (StgInt)sz ) {
- /* NB. 1. sz == q->size - 1, thus ">="
- 2. signed comparison, it is possible that t > b
- */
- /* could be full, check the real top value in this case */
- t = q->top;
- q->topBound = t;
- if (b - t >= sz) { /* really no space left :-( */
- /* reallocate the array, copying the values. Concurrent steal()s
- will in the meantime use the old one and modify only top.
- This means: we cannot safely free the old space! Can keep it
- on a free list internally here...
+ if ( b - t > q->size - 1 ) {
+ /* Full queue */
+ /* We don't implement resizing, just say we didn't push anything. */
+ return false;
+ }
- Potential bug in combination with steal(): if array is
- replaced, it is unclear which one concurrent steal operations
- use. Must read the array base address in advance in steal().
- */
-#if defined(DISCARD_NEW)
- ASSERT_WSDEQUE_INVARIANTS(q);
- return false; // we didn't push anything
+ RELAXED_STORE(&q->elements[b & q->moduloSize], elem);
+#if defined(TSAN_ENABLED)
+ // ThreadSanizer doesn't know about release fences, so we need to
+ // strengthen this to a release store lest we get spurious data race
+ // reports.
+ RELEASE_STORE(&q->bottom, b+1);
#else
- /* could make room by incrementing the top position here. In
- * this case, should use CASTOP. If this fails, someone else has
- * removed something, and new room will be available.
- */
- ASSERT_WSDEQUE_INVARIANTS(q);
+ RELEASE_FENCE();
+ RELAXED_STORE(&q->bottom, b+1);
#endif
- }
- }
-
- q->elements[b & sz] = elem;
- /*
- KG: we need to put write barrier here since otherwise we might
- end with elem not added to q->elements, but q->bottom already
- modified (write reordering) and with stealWSDeque_ failing
- later when invoked from another thread since it thinks elem is
- there (in case there is just added element in the queue). This
- issue concretely hit me on ARMv7 multi-core CPUs
- */
- write_barrier();
- q->bottom = b + 1;
-
- ASSERT_WSDEQUE_INVARIANTS(q);
return true;
}
diff --git a/rts/WSDeque.h b/rts/WSDeque.h
index 2936c281fe..0104884bdb 100644
--- a/rts/WSDeque.h
+++ b/rts/WSDeque.h
@@ -11,24 +11,19 @@
typedef struct WSDeque_ {
// Size of elements array. Used for modulo calculation: we round up
// to powers of 2 and use the dyadic log (modulo == bitwise &)
- StgWord size;
+ StgInt size;
StgWord moduloSize; /* bitmask for modulo */
// top, index where multiple readers steal() (protected by a cas)
- volatile StgWord top;
+ StgInt top;
// bottom, index of next free place where one writer can push
// elements. This happens unsynchronised.
- volatile StgWord bottom;
+ StgInt bottom;
// both top and bottom are continuously incremented, and used as
// an index modulo the current array size.
- // lower bound on the current top value. This is an internal
- // optimisation to avoid unnecessarily accessing the top field
- // inside pushBottom
- volatile StgWord topBound;
-
// The elements array
void ** elements;
@@ -39,18 +34,17 @@ typedef struct WSDeque_ {
} WSDeque;
/* INVARIANTS, in this order: reasonable size,
- topBound consistent, space pointer, space accessible to us.
+ space pointer, space accessible to us.
NB. This is safe to use only (a) on a spark pool owned by the
current thread, or (b) when there's only one thread running, or no
stealing going on (e.g. during GC).
*/
-#define ASSERT_WSDEQUE_INVARIANTS(p) \
- ASSERT((p)->size > 0); \
- ASSERT((p)->topBound <= (p)->top); \
- ASSERT((p)->elements != NULL); \
- ASSERT(*((p)->elements) || 1); \
- ASSERT(*((p)->elements - 1 + ((p)->size)) || 1);
+#define ASSERT_WSDEQUE_INVARIANTS(p) \
+ ASSERT((p)->size > 0); \
+ ASSERT(RELAXED_LOAD(&(p)->elements) != NULL); \
+ ASSERT(RELAXED_LOAD(&(p)->elements[0]) || 1); \
+ ASSERT(RELAXED_LOAD(&(p)->elements[(p)->size - 1]) || 1);
// No: it is possible that top > bottom when using pop()
// ASSERT((p)->bottom >= (p)->top);
@@ -69,15 +63,15 @@ typedef struct WSDeque_ {
WSDeque * newWSDeque (uint32_t size);
void freeWSDeque (WSDeque *q);
-// Take an element from the "write" end of the pool. Can be called
+// (owner-only) Take an element from the "write" end of the pool. Can be called
// by the pool owner only.
void* popWSDeque (WSDeque *q);
-// Push onto the "write" end of the pool. Return true if the push
+// (owner-only) Push onto the "write" end of the pool. Return true if the push
// succeeded, or false if the deque is full.
bool pushWSDeque (WSDeque *q, void *elem);
-// Removes all elements from the deque
+// (owner-only) Removes all elements from the deque.
EXTERN_INLINE void discardElements (WSDeque *q);
// Removes an element of the deque from the "read" end, or returns
@@ -90,23 +84,27 @@ void * stealWSDeque_ (WSDeque *q);
void * stealWSDeque (WSDeque *q);
// "guesses" whether a deque is empty. Can return false negatives in
-// presence of concurrent steal() calls, and false positives in
-// presence of a concurrent pushBottom().
+// presence of concurrent steal() calls, and false positives in
+// presence of a concurrent pushBottom().
EXTERN_INLINE bool looksEmptyWSDeque (WSDeque* q);
-EXTERN_INLINE long dequeElements (WSDeque *q);
+// "guesses" how many elements are present on the deque. Like
+// looksEmptyWSDeque, this may suggest that the deque is empty when it's not
+// and vice-versa.
+EXTERN_INLINE StgInt dequeElements (WSDeque *q);
/* -----------------------------------------------------------------------------
* PRIVATE below here
* -------------------------------------------------------------------------- */
-EXTERN_INLINE long
+EXTERN_INLINE StgInt
dequeElements (WSDeque *q)
{
- StgWord t = q->top;
- StgWord b = q->bottom;
+ StgWord t = ACQUIRE_LOAD(&q->top);
+ StgWord b = ACQUIRE_LOAD(&q->bottom);
// try to prefer false negatives by reading top first
- return ((long)b - (long)t);
+ StgInt n = (StgInt)b - (StgInt)t;
+ return n > 0 ? n : 0;
}
EXTERN_INLINE bool
@@ -118,6 +116,5 @@ looksEmptyWSDeque (WSDeque *q)
EXTERN_INLINE void
discardElements (WSDeque *q)
{
- q->top = q->bottom;
-// pool->topBound = pool->top;
+ RELAXED_STORE(&q->top, RELAXED_LOAD(&q->bottom));
}
diff --git a/rts/Weak.c b/rts/Weak.c
index fe4516794a..0adf5a8b92 100644
--- a/rts/Weak.c
+++ b/rts/Weak.c
@@ -57,8 +57,7 @@ runAllCFinalizers(StgWeak *list)
// If there's no major GC between the time that the finalizer for the
// object from the oldest generation is manually called and shutdown
// we end up running the same finalizer twice. See #7170.
- const StgInfoTable *winfo = w->header.info;
- load_load_barrier();
+ const StgInfoTable *winfo = ACQUIRE_LOAD(&w->header.info);
if (winfo != &stg_DEAD_WEAK_info) {
runCFinalizers((StgCFinalizerList *)w->cfinalizers);
}
@@ -93,10 +92,10 @@ scheduleFinalizers(Capability *cap, StgWeak *list)
StgWord size;
uint32_t n, i;
- // This assertion does not hold with non-moving collection because
- // non-moving collector does not wait for the list to be consumed (by
- // doIdleGcWork()) before appending the list with more finalizers.
- ASSERT(RtsFlags.GcFlags.useNonmoving || n_finalizers == 0);
+ // n_finalizers is not necessarily zero under non-moving collection
+ // because non-moving collector does not wait for the list to be consumed
+ // (by doIdleGcWork()) before appending the list with more finalizers.
+ ASSERT(RtsFlags.GcFlags.useNonmoving || SEQ_CST_LOAD(&n_finalizers) == 0);
// Append finalizer_list with the new list. TODO: Perhaps cache tail of the
// list for faster append. NOTE: We can't append `list` here! Otherwise we
@@ -105,7 +104,7 @@ scheduleFinalizers(Capability *cap, StgWeak *list)
while (*tl) {
tl = &(*tl)->link;
}
- *tl = list;
+ SEQ_CST_STORE(tl, list);
// Traverse the list and
// * count the number of Haskell finalizers
@@ -140,7 +139,7 @@ scheduleFinalizers(Capability *cap, StgWeak *list)
SET_HDR(w, &stg_DEAD_WEAK_info, w->header.prof.ccs);
}
- n_finalizers += i;
+ SEQ_CST_ADD(&n_finalizers, i);
// No Haskell finalizers to run?
if (n == 0) return;
@@ -226,7 +225,7 @@ static volatile StgWord finalizer_lock = 0;
//
bool runSomeFinalizers(bool all)
{
- if (n_finalizers == 0)
+ if (RELAXED_LOAD(&n_finalizers) == 0)
return false;
if (cas(&finalizer_lock, 0, 1) != 0) {
@@ -252,17 +251,15 @@ bool runSomeFinalizers(bool all)
if (!all && count >= finalizer_chunk) break;
}
- finalizer_list = w;
- n_finalizers -= count;
+ RELAXED_STORE(&finalizer_list, w);
+ SEQ_CST_ADD(&n_finalizers, -count);
if (task != NULL) {
task->running_finalizers = false;
}
debugTrace(DEBUG_sched, "ran %d C finalizers", count);
-
- write_barrier();
- finalizer_lock = 0;
-
- return n_finalizers != 0;
+ bool ret = n_finalizers != 0;
+ RELEASE_STORE(&finalizer_lock, 0);
+ return ret;
}
diff --git a/rts/posix/GetTime.c b/rts/posix/GetTime.c
index 0128e3bc8b..7d53f95401 100644
--- a/rts/posix/GetTime.c
+++ b/rts/posix/GetTime.c
@@ -85,7 +85,9 @@ Time getCurrentThreadCPUTime(void)
defined(CLOCK_PROCESS_CPUTIME_ID) && \
defined(HAVE_SYSCONF)
static bool have_checked_usability = false;
- if (!have_checked_usability) {
+ // The RELAXED operation is fine here as it's okay if we do the check below
+ // more than once.
+ if (!RELAXED_LOAD(&have_checked_usability)) {
// The Linux clock_getres(2) manpage claims that some early versions of
// Linux will return values which are uninterpretable in the presence
// of migration across CPUs. They claim that clock_getcpuclockid(0)
@@ -95,7 +97,7 @@ Time getCurrentThreadCPUTime(void)
sysErrorBelch("getCurrentThreadCPUTime: no supported");
stg_exit(EXIT_FAILURE);
}
- have_checked_usability = true;
+ RELAXED_STORE(&have_checked_usability, true);
}
return getClockTime(CLOCK_THREAD_CPUTIME_ID);
#else
diff --git a/rts/posix/OSThreads.c b/rts/posix/OSThreads.c
index 42d72c2e0d..6347e8ce7a 100644
--- a/rts/posix/OSThreads.c
+++ b/rts/posix/OSThreads.c
@@ -240,13 +240,14 @@ forkOS_createThread ( HsStablePtr entry )
void freeThreadingResources (void) { /* nothing */ }
+static uint32_t nproc_cache = 0;
+
// Get the number of logical CPU cores available to us. Note that this is
// different from the number of physical cores (see #14781).
uint32_t
getNumberOfProcessors (void)
{
- static uint32_t nproc = 0;
-
+ uint32_t nproc = RELAXED_LOAD(&nproc_cache);
if (nproc == 0) {
#if defined(HAVE_SCHED_GETAFFINITY)
cpu_set_t mask;
@@ -287,6 +288,7 @@ getNumberOfProcessors (void)
#else
nproc = 1;
#endif
+ RELAXED_STORE(&nproc_cache, nproc);
}
return nproc;
@@ -396,6 +398,14 @@ interruptOSThread (OSThreadId id)
pthread_kill(id, SIGPIPE);
}
+void
+joinOSThread (OSThreadId id)
+{
+ if (pthread_join(id, NULL) != 0) {
+ sysErrorBelch("joinOSThread: error %d", errno);
+ }
+}
+
KernelThreadId kernelThreadId (void)
{
#if defined(linux_HOST_OS)
diff --git a/rts/posix/Signals.c b/rts/posix/Signals.c
index 2e534042f3..5ad688bc2f 100644
--- a/rts/posix/Signals.c
+++ b/rts/posix/Signals.c
@@ -128,7 +128,7 @@ more_handlers(int sig)
}
// Here's the pipe into which we will send our signals
-static volatile int io_manager_wakeup_fd = -1;
+static int io_manager_wakeup_fd = -1;
static int timer_manager_control_wr_fd = -1;
#define IO_MANAGER_WAKEUP 0xff
@@ -136,7 +136,7 @@ static int timer_manager_control_wr_fd = -1;
#define IO_MANAGER_SYNC 0xfd
void setTimerManagerControlFd(int fd) {
- timer_manager_control_wr_fd = fd;
+ RELAXED_STORE(&timer_manager_control_wr_fd, fd);
}
void
@@ -144,7 +144,7 @@ setIOManagerWakeupFd (int fd)
{
// only called when THREADED_RTS, but unconditionally
// compiled here because GHC.Event.Control depends on it.
- io_manager_wakeup_fd = fd;
+ SEQ_CST_STORE(&io_manager_wakeup_fd, fd);
}
/* -----------------------------------------------------------------------------
@@ -154,14 +154,15 @@ void
ioManagerWakeup (void)
{
int r;
+ const int wakeup_fd = SEQ_CST_LOAD(&io_manager_wakeup_fd);
// Wake up the IO Manager thread by sending a byte down its pipe
- if (io_manager_wakeup_fd >= 0) {
+ if (wakeup_fd >= 0) {
#if defined(HAVE_EVENTFD)
StgWord64 n = (StgWord64)IO_MANAGER_WAKEUP;
- r = write(io_manager_wakeup_fd, (char *) &n, 8);
+ r = write(wakeup_fd, (char *) &n, 8);
#else
StgWord8 byte = (StgWord8)IO_MANAGER_WAKEUP;
- r = write(io_manager_wakeup_fd, &byte, 1);
+ r = write(wakeup_fd, &byte, 1);
#endif
/* N.B. If the TimerManager is shutting down as we run this
* then there is a possibility that our first read of
@@ -174,7 +175,7 @@ ioManagerWakeup (void)
* Since this is not an error condition, we do not print the error
* message in this case.
*/
- if (r == -1 && io_manager_wakeup_fd >= 0) {
+ if (r == -1 && SEQ_CST_LOAD(&io_manager_wakeup_fd) >= 0) {
sysErrorBelch("ioManagerWakeup: write");
}
}
@@ -186,21 +187,27 @@ ioManagerDie (void)
{
StgWord8 byte = (StgWord8)IO_MANAGER_DIE;
uint32_t i;
- int fd;
int r;
- if (0 <= timer_manager_control_wr_fd) {
- r = write(timer_manager_control_wr_fd, &byte, 1);
- if (r == -1) { sysErrorBelch("ioManagerDie: write"); }
- timer_manager_control_wr_fd = -1;
- }
-
- for (i=0; i < n_capabilities; i++) {
- fd = capabilities[i]->io_manager_control_wr_fd;
+ {
+ // Shut down timer manager
+ const int fd = RELAXED_LOAD(&timer_manager_control_wr_fd);
if (0 <= fd) {
r = write(fd, &byte, 1);
if (r == -1) { sysErrorBelch("ioManagerDie: write"); }
- capabilities[i]->io_manager_control_wr_fd = -1;
+ RELAXED_STORE(&timer_manager_control_wr_fd, -1);
+ }
+ }
+
+ {
+ // Shut down IO managers
+ for (i=0; i < n_capabilities; i++) {
+ const int fd = RELAXED_LOAD(&capabilities[i]->io_manager_control_wr_fd);
+ if (0 <= fd) {
+ r = write(fd, &byte, 1);
+ if (r == -1) { sysErrorBelch("ioManagerDie: write"); }
+ RELAXED_STORE(&capabilities[i]->io_manager_control_wr_fd, -1);
+ }
}
}
}
@@ -216,7 +223,7 @@ ioManagerStart (void)
{
// Make sure the IO manager thread is running
Capability *cap;
- if (timer_manager_control_wr_fd < 0 || io_manager_wakeup_fd < 0) {
+ if (SEQ_CST_LOAD(&timer_manager_control_wr_fd) < 0 || SEQ_CST_LOAD(&io_manager_wakeup_fd) < 0) {
cap = rts_lock();
ioManagerStartCap(&cap);
rts_unlock(cap);
@@ -258,9 +265,10 @@ generic_handler(int sig USED_IF_THREADS,
memcpy(buf+1, info, sizeof(siginfo_t));
}
- if (0 <= timer_manager_control_wr_fd)
+ int timer_control_fd = RELAXED_LOAD(&timer_manager_control_wr_fd);
+ if (0 <= timer_control_fd)
{
- r = write(timer_manager_control_wr_fd, buf, sizeof(siginfo_t)+1);
+ r = write(timer_control_fd, buf, sizeof(siginfo_t)+1);
if (r == -1 && errno == EAGAIN) {
errorBelch("lost signal due to full pipe: %d\n", sig);
}
diff --git a/rts/posix/itimer/Pthread.c b/rts/posix/itimer/Pthread.c
index dd36137b72..82379b9172 100644
--- a/rts/posix/itimer/Pthread.c
+++ b/rts/posix/itimer/Pthread.c
@@ -85,11 +85,11 @@ static Time itimer_interval = DEFAULT_TICK_INTERVAL;
// Should we be firing ticks?
// Writers to this must hold the mutex below.
-static volatile bool stopped = false;
+static bool stopped = false;
// should the ticker thread exit?
// This can be set without holding the mutex.
-static volatile bool exited = true;
+static bool exited = true;
// Signaled when we want to (re)start the timer
static Condition start_cond;
@@ -120,7 +120,9 @@ static void *itimer_thread_func(void *_handle_tick)
}
#endif
- while (!exited) {
+ // Relaxed is sufficient: If we don't see that exited was set in one iteration we will
+ // see it next time.
+ while (!RELAXED_LOAD(&exited)) {
if (USE_TIMERFD_FOR_ITIMER) {
ssize_t r = read(timerfd, &nticks, sizeof(nticks));
if ((r == 0) && (errno == 0)) {
@@ -142,7 +144,8 @@ static void *itimer_thread_func(void *_handle_tick)
}
// first try a cheap test
- if (stopped) {
+ TSAN_ANNOTATE_BENIGN_RACE(&stopped, "itimer_thread_func");
+ if (RELAXED_LOAD(&stopped)) {
OS_ACQUIRE_LOCK(&mutex);
// should we really stop?
if (stopped) {
@@ -186,7 +189,7 @@ void
startTicker(void)
{
OS_ACQUIRE_LOCK(&mutex);
- stopped = 0;
+ RELAXED_STORE(&stopped, false);
signalCondition(&start_cond);
OS_RELEASE_LOCK(&mutex);
}
@@ -196,7 +199,7 @@ void
stopTicker(void)
{
OS_ACQUIRE_LOCK(&mutex);
- stopped = 1;
+ RELAXED_STORE(&stopped, true);
OS_RELEASE_LOCK(&mutex);
}
@@ -204,8 +207,8 @@ stopTicker(void)
void
exitTicker (bool wait)
{
- ASSERT(!exited);
- exited = true;
+ ASSERT(!SEQ_CST_LOAD(&exited));
+ SEQ_CST_STORE(&exited, true);
// ensure that ticker wakes up if stopped
startTicker();
diff --git a/rts/rts.cabal.in b/rts/rts.cabal.in
index 9860951226..12a4d68e4a 100644
--- a/rts/rts.cabal.in
+++ b/rts/rts.cabal.in
@@ -43,6 +43,12 @@ flag logging
default: False
flag dynamic
default: False
+flag thread-sanitizer
+ description:
+ Enable checking for data races using the ThreadSanitizer (TSAN)
+ mechanism supported by GCC and Clang. See Note [ThreadSanitizer]
+ in @includes/rts/TSANUtils.h@.
+ default: False
library
-- rts is a wired in package and
@@ -78,6 +84,11 @@ library
if flag(dynamic)
extra-dynamic-library-flavours: _thr
+ if flag(thread-sanitizer)
+ cc-options: -fsanitize=thread
+ ld-options: -fsanitize=thread
+ extra-libraries: tsan
+
exposed: True
exposed-modules:
if flag(libm)
@@ -165,6 +176,7 @@ library
rts/Ticky.h
rts/Time.h
rts/Timer.h
+ rts/TSANUtils.h
rts/Types.h
rts/Utils.h
rts/prof/CCS.h
@@ -450,6 +462,7 @@ library
STM.c
Schedule.c
Sparks.c
+ SpinLock.c
StableName.c
StablePtr.c
StaticPtrTable.c
diff --git a/rts/sm/BlockAlloc.c b/rts/sm/BlockAlloc.c
index 2bf497197e..451c182ac3 100644
--- a/rts/sm/BlockAlloc.c
+++ b/rts/sm/BlockAlloc.c
@@ -787,6 +787,26 @@ free_mega_group (bdescr *mg)
}
+/* Note [Data races in freeGroup]
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ * freeGroup commits a rather serious concurrency sin in its block coalescence
+ * logic: When freeing a block it looks at bd->free of the previous/next block
+ * to see whether it is allocated. However, the free'ing thread likely does not
+ * own the previous/next block, nor do we make any attempt to synchronize with
+ * the thread that *does* own it; this makes this access a data race.
+ *
+ * The original design argued that this was correct because `bd->free` will
+ * only take a value of -1 when the block is free and thereby owned by the
+ * storage manager. However, this is nevertheless unsafe under the C11 data
+ * model, which guarantees no particular semantics for data races.
+ *
+ * We currently assume (and hope) we won't see torn values and consequently
+ * we will never see `bd->free == -1` for an allocated block which we do not
+ * own. However, this is all extremely dodgy.
+ *
+ * This is tracked as #18913.
+ */
+
void
freeGroup(bdescr *p)
{
@@ -796,7 +816,7 @@ freeGroup(bdescr *p)
// not true in multithreaded GC:
// ASSERT_SM_LOCK();
- ASSERT(p->free != (P_)-1);
+ ASSERT(RELAXED_LOAD(&p->free) != (P_)-1);
#if defined(DEBUG)
for (uint32_t i=0; i < p->blocks; i++) {
@@ -806,9 +826,9 @@ freeGroup(bdescr *p)
node = p->node;
- p->free = (void *)-1; /* indicates that this block is free */
- p->gen = NULL;
- p->gen_no = 0;
+ RELAXED_STORE(&p->free, (void *) -1); /* indicates that this block is free */
+ RELAXED_STORE(&p->gen, NULL);
+ RELAXED_STORE(&p->gen_no, 0);
/* fill the block group with garbage if sanity checking is on */
IF_DEBUG(zero_on_gc, memset(p->start, 0xaa, (W_)p->blocks * BLOCK_SIZE));
@@ -834,7 +854,11 @@ freeGroup(bdescr *p)
{
bdescr *next;
next = p + p->blocks;
- if (next <= LAST_BDESCR(MBLOCK_ROUND_DOWN(p)) && next->free == (P_)-1)
+
+ // See Note [Data races in freeGroup].
+ TSAN_ANNOTATE_BENIGN_RACE(&next->free, "freeGroup");
+ if (next <= LAST_BDESCR(MBLOCK_ROUND_DOWN(p))
+ && RELAXED_LOAD(&next->free) == (P_)-1)
{
p->blocks += next->blocks;
ln = log_2(next->blocks);
@@ -855,7 +879,9 @@ freeGroup(bdescr *p)
prev = p - 1;
if (prev->blocks == 0) prev = prev->link; // find the head
- if (prev->free == (P_)-1)
+ // See Note [Data races in freeGroup].
+ TSAN_ANNOTATE_BENIGN_RACE(&prev->free, "freeGroup");
+ if (RELAXED_LOAD(&prev->free) == (P_)-1)
{
ln = log_2(prev->blocks);
dbl_link_remove(prev, &free_list[node][ln]);
diff --git a/rts/sm/CNF.c b/rts/sm/CNF.c
index 2c701c2c29..31b3cb99f2 100644
--- a/rts/sm/CNF.c
+++ b/rts/sm/CNF.c
@@ -245,6 +245,7 @@ compactAllocateBlockInternal(Capability *cap,
initBdescr(head, g, g);
head->flags = BF_COMPACT;
for (block = head + 1, n_blocks --; n_blocks > 0; block++, n_blocks--) {
+ initBdescr(block, g, g);
block->link = head;
block->blocks = 0;
block->flags = BF_COMPACT;
diff --git a/rts/sm/Evac.c b/rts/sm/Evac.c
index 0ece06016a..b324a59179 100644
--- a/rts/sm/Evac.c
+++ b/rts/sm/Evac.c
@@ -171,7 +171,11 @@ copy_tag(StgClosure **p, const StgInfoTable *info,
#endif
return evacuate(p); // does the failed_to_evac stuff
} else {
- *p = TAG_CLOSURE(tag,(StgClosure*)to);
+ // This doesn't need to have RELEASE ordering since we are guaranteed
+ // to scavenge the to-space object on the current core therefore
+ // no-one else will follow this pointer (FIXME: Is this true in
+ // light of the selector optimization?).
+ RELEASE_STORE(p, TAG_CLOSURE(tag,(StgClosure*)to));
}
}
#else
@@ -206,9 +210,9 @@ copy_tag_nolock(StgClosure **p, const StgInfoTable *info,
// if somebody else reads the forwarding pointer, we better make
// sure there's a closure at the end of it.
- write_barrier();
- *p = TAG_CLOSURE(tag,(StgClosure*)to);
- src->header.info = (const StgInfoTable *)MK_FORWARDING_PTR(to);
+ RELEASE_STORE(p, TAG_CLOSURE(tag,(StgClosure*)to));
+ RELEASE_STORE(&src->header.info, \
+ (const StgInfoTable *)MK_FORWARDING_PTR(to));
// if (to+size+2 < bd->start + BLOCK_SIZE_W) {
// __builtin_prefetch(to + size + 2, 1);
@@ -245,7 +249,7 @@ spin:
goto spin;
}
if (IS_FORWARDING_PTR(info)) {
- src->header.info = (const StgInfoTable *)info;
+ RELEASE_STORE(&src->header.info, (const StgInfoTable *)info);
evacuate(p); // does the failed_to_evac stuff
return false;
}
@@ -261,9 +265,8 @@ spin:
to[i] = from[i];
}
- write_barrier();
- *p = (StgClosure *)to;
- src->header.info = (const StgInfoTable*)MK_FORWARDING_PTR(to);
+ RELEASE_STORE(p, (StgClosure *) to);
+ RELEASE_STORE(&src->header.info, (const StgInfoTable*)MK_FORWARDING_PTR(to));
#if defined(PROFILING)
// We store the size of the just evacuated object in the LDV word so that
@@ -306,12 +309,12 @@ evacuate_large(StgPtr p)
gen_workspace *ws;
bd = Bdescr(p);
- gen = bd->gen;
- gen_no = bd->gen_no;
+ gen = RELAXED_LOAD(&bd->gen);
+ gen_no = RELAXED_LOAD(&bd->gen_no);
ACQUIRE_SPIN_LOCK(&gen->sync);
// already evacuated?
- if (bd->flags & BF_EVACUATED) {
+ if (RELAXED_LOAD(&bd->flags) & BF_EVACUATED) {
/* Don't forget to set the gct->failed_to_evac flag if we didn't get
* the desired destination (see comments in evacuate()).
*/
@@ -344,9 +347,9 @@ evacuate_large(StgPtr p)
ws = &gct->gens[new_gen_no];
new_gen = &generations[new_gen_no];
- bd->flags |= BF_EVACUATED;
+ __atomic_fetch_or(&bd->flags, BF_EVACUATED, __ATOMIC_ACQ_REL);
if (RTS_UNLIKELY(RtsFlags.GcFlags.useNonmoving && new_gen == oldest_gen)) {
- bd->flags |= BF_NONMOVING;
+ __atomic_fetch_or(&bd->flags, BF_NONMOVING, __ATOMIC_ACQ_REL);
}
initBdescr(bd, new_gen, new_gen->to);
@@ -354,7 +357,7 @@ evacuate_large(StgPtr p)
// these objects, because they aren't allowed to contain any outgoing
// pointers. For these blocks, we skip the scavenge stage and put
// them straight on the scavenged_large_objects list.
- if (bd->flags & BF_PINNED) {
+ if (RELAXED_LOAD(&bd->flags) & BF_PINNED) {
ASSERT(get_itbl((StgClosure *)p)->type == ARR_WORDS);
if (new_gen != gen) { ACQUIRE_SPIN_LOCK(&new_gen->sync); }
@@ -389,7 +392,7 @@ evacuate_static_object (StgClosure **link_field, StgClosure *q)
return;
}
- StgWord link = (StgWord)*link_field;
+ StgWord link = RELAXED_LOAD((StgWord*) link_field);
// See Note [STATIC_LINK fields] for how the link field bits work
if (((link & STATIC_BITS) | prev_static_flag) != 3) {
@@ -435,7 +438,7 @@ evacuate_compact (StgPtr p)
bd = Bdescr((StgPtr)str);
gen_no = bd->gen_no;
- if (bd->flags & BF_NONMOVING) {
+ if (RELAXED_LOAD(&bd->flags) & BF_NONMOVING) {
// We may have evacuated the block to the nonmoving generation. If so
// we need to make sure it is added to the mark queue since the only
// reference to it may be from the moving heap.
@@ -500,7 +503,7 @@ evacuate_compact (StgPtr p)
// in the GC, and that should never see blocks other than the first)
bd->flags |= BF_EVACUATED;
if (RTS_UNLIKELY(RtsFlags.GcFlags.useNonmoving && new_gen == oldest_gen)) {
- bd->flags |= BF_NONMOVING;
+ __atomic_fetch_or(&bd->flags, BF_NONMOVING, __ATOMIC_RELAXED);
}
initBdescr(bd, new_gen, new_gen->to);
@@ -581,7 +584,7 @@ evacuate(StgClosure **p)
const StgInfoTable *info;
StgWord tag;
- q = *p;
+ q = RELAXED_LOAD(p);
loop:
/* The tag and the pointer are split, to be merged after evacing */
@@ -638,10 +641,11 @@ loop:
bd = Bdescr((P_)q);
- if ((bd->flags & (BF_LARGE | BF_MARKED | BF_EVACUATED | BF_COMPACT | BF_NONMOVING)) != 0) {
+ uint16_t flags = RELAXED_LOAD(&bd->flags);
+ if ((flags & (BF_LARGE | BF_MARKED | BF_EVACUATED | BF_COMPACT | BF_NONMOVING)) != 0) {
// Pointer to non-moving heap. Non-moving heap is collected using
// mark-sweep so this object should be marked and then retained in sweep.
- if (RTS_UNLIKELY(bd->flags & BF_NONMOVING)) {
+ if (RTS_UNLIKELY(RELAXED_LOAD(&bd->flags) & BF_NONMOVING)) {
// NOTE: large objects in nonmoving heap are also marked with
// BF_NONMOVING. Those are moved to scavenged_large_objects list in
// mark phase.
@@ -656,11 +660,11 @@ loop:
// happen often, but allowing it makes certain things a bit
// easier; e.g. scavenging an object is idempotent, so it's OK to
// have an object on the mutable list multiple times.
- if (bd->flags & BF_EVACUATED) {
+ if (flags & BF_EVACUATED) {
// We aren't copying this object, so we have to check
// whether it is already in the target generation. (this is
// the write barrier).
- if (bd->gen_no < gct->evac_gen_no) {
+ if (RELAXED_LOAD(&bd->gen_no) < gct->evac_gen_no) {
gct->failed_to_evac = true;
TICK_GC_FAILED_PROMOTION();
}
@@ -671,20 +675,20 @@ loop:
// right thing for objects that are half way in the middle of the first
// block of a compact (and would be treated as large objects even though
// they are not)
- if (bd->flags & BF_COMPACT) {
+ if (flags & BF_COMPACT) {
evacuate_compact((P_)q);
return;
}
/* evacuate large objects by re-linking them onto a different list.
*/
- if (bd->flags & BF_LARGE) {
+ if (flags & BF_LARGE) {
evacuate_large((P_)q);
// We may have evacuated the block to the nonmoving generation. If so
// we need to make sure it is added to the mark queue since the only
// reference to it may be from the moving heap.
- if (major_gc && bd->flags & BF_NONMOVING && !deadlock_detect_gc) {
+ if (major_gc && flags & BF_NONMOVING && !deadlock_detect_gc) {
markQueuePushClosureGC(&gct->cap->upd_rem_set.queue, q);
}
return;
@@ -702,7 +706,7 @@ loop:
gen_no = bd->dest_no;
- info = q->header.info;
+ info = ACQUIRE_LOAD(&q->header.info);
if (IS_FORWARDING_PTR(info))
{
/* Already evacuated, just return the forwarding address.
@@ -722,9 +726,12 @@ loop:
* check if gen is too low.
*/
StgClosure *e = (StgClosure*)UN_FORWARDING_PTR(info);
- *p = TAG_CLOSURE(tag,e);
+ RELAXED_STORE(p, TAG_CLOSURE(tag,e));
if (gen_no < gct->evac_gen_no) { // optimisation
- if (Bdescr((P_)e)->gen_no < gct->evac_gen_no) {
+ // The ACQUIRE here is necessary to ensure that we see gen_no if the
+ // evacuted object lives in a block newly-allocated by a GC thread on
+ // another core.
+ if (ACQUIRE_LOAD(&Bdescr((P_)e)->gen_no) < gct->evac_gen_no) {
gct->failed_to_evac = true;
TICK_GC_FAILED_PROMOTION();
}
@@ -752,15 +759,17 @@ loop:
if (info == Czh_con_info &&
// unsigned, so always true: (StgChar)w >= MIN_CHARLIKE &&
(StgChar)w <= MAX_CHARLIKE) {
- *p = TAG_CLOSURE(tag,
- (StgClosure *)CHARLIKE_CLOSURE((StgChar)w)
- );
+ RELAXED_STORE(p, \
+ TAG_CLOSURE(tag, \
+ (StgClosure *)CHARLIKE_CLOSURE((StgChar)w)
+ ));
}
else if (info == Izh_con_info &&
(StgInt)w >= MIN_INTLIKE && (StgInt)w <= MAX_INTLIKE) {
- *p = TAG_CLOSURE(tag,
- (StgClosure *)INTLIKE_CLOSURE((StgInt)w)
- );
+ RELAXED_STORE(p, \
+ TAG_CLOSURE(tag, \
+ (StgClosure *)INTLIKE_CLOSURE((StgInt)w)
+ ));
}
else {
copy_tag_nolock(p,info,q,sizeofW(StgHeader)+1,gen_no,tag);
@@ -814,10 +823,10 @@ loop:
const StgInfoTable *i;
r = ((StgInd*)q)->indirectee;
if (GET_CLOSURE_TAG(r) == 0) {
- i = r->header.info;
+ i = ACQUIRE_LOAD(&r->header.info);
if (IS_FORWARDING_PTR(i)) {
r = (StgClosure *)UN_FORWARDING_PTR(i);
- i = r->header.info;
+ i = ACQUIRE_LOAD(&r->header.info);
}
if (i == &stg_TSO_info
|| i == &stg_WHITEHOLE_info
@@ -842,7 +851,7 @@ loop:
ASSERT(i != &stg_IND_info);
}
q = r;
- *p = r;
+ RELEASE_STORE(p, r);
goto loop;
}
@@ -868,8 +877,8 @@ loop:
case IND:
// follow chains of indirections, don't evacuate them
- q = ((StgInd*)q)->indirectee;
- *p = q;
+ q = RELAXED_LOAD(&((StgInd*)q)->indirectee);
+ RELAXED_STORE(p, q);
goto loop;
case RET_BCO:
@@ -983,11 +992,12 @@ evacuate_BLACKHOLE(StgClosure **p)
ASSERT(GET_CLOSURE_TAG(q) == 0);
bd = Bdescr((P_)q);
+ const uint16_t flags = RELAXED_LOAD(&bd->flags);
// blackholes can't be in a compact
- ASSERT((bd->flags & BF_COMPACT) == 0);
+ ASSERT((flags & BF_COMPACT) == 0);
- if (RTS_UNLIKELY(bd->flags & BF_NONMOVING)) {
+ if (RTS_UNLIKELY(RELAXED_LOAD(&bd->flags) & BF_NONMOVING)) {
if (major_gc && !deadlock_detect_gc)
markQueuePushClosureGC(&gct->cap->upd_rem_set.queue, q);
return;
@@ -996,18 +1006,18 @@ evacuate_BLACKHOLE(StgClosure **p)
// blackholes *can* be in a large object: when raiseAsync() creates an
// AP_STACK the payload might be large enough to create a large object.
// See #14497.
- if (bd->flags & BF_LARGE) {
+ if (flags & BF_LARGE) {
evacuate_large((P_)q);
return;
}
- if (bd->flags & BF_EVACUATED) {
+ if (flags & BF_EVACUATED) {
if (bd->gen_no < gct->evac_gen_no) {
gct->failed_to_evac = true;
TICK_GC_FAILED_PROMOTION();
}
return;
}
- if (bd->flags & BF_MARKED) {
+ if (flags & BF_MARKED) {
if (!is_marked((P_)q,bd)) {
mark((P_)q,bd);
push_mark_stack((P_)q);
@@ -1015,13 +1025,13 @@ evacuate_BLACKHOLE(StgClosure **p)
return;
}
gen_no = bd->dest_no;
- info = q->header.info;
+ info = ACQUIRE_LOAD(&q->header.info);
if (IS_FORWARDING_PTR(info))
{
StgClosure *e = (StgClosure*)UN_FORWARDING_PTR(info);
*p = e;
if (gen_no < gct->evac_gen_no) { // optimisation
- if (Bdescr((P_)e)->gen_no < gct->evac_gen_no) {
+ if (ACQUIRE_LOAD(&Bdescr((P_)e)->gen_no) < gct->evac_gen_no) {
gct->failed_to_evac = true;
TICK_GC_FAILED_PROMOTION();
}
@@ -1090,13 +1100,11 @@ unchain_thunk_selectors(StgSelector *p, StgClosure *val)
// XXX we do not have BLACKHOLEs any more; replace with
// a THUNK_SELECTOR again. This will go into a loop if it is
// entered, and should result in a NonTermination exception.
- ((StgThunk *)p)->payload[0] = val;
- write_barrier();
- SET_INFO((StgClosure *)p, &stg_sel_0_upd_info);
+ RELAXED_STORE(&((StgThunk *)p)->payload[0], val);
+ SET_INFO_RELEASE((StgClosure *)p, &stg_sel_0_upd_info);
} else {
- ((StgInd *)p)->indirectee = val;
- write_barrier();
- SET_INFO((StgClosure *)p, &stg_IND_info);
+ RELAXED_STORE(&((StgInd *)p)->indirectee, val);
+ SET_INFO_RELEASE((StgClosure *)p, &stg_IND_info);
}
// For the purposes of LDV profiling, we have created an
@@ -1143,7 +1151,7 @@ selector_chain:
// save any space in any case, and updating with an indirection is
// trickier in a non-collected gen: we would have to update the
// mutable list.
- if (bd->flags & (BF_EVACUATED | BF_NONMOVING)) {
+ if (RELAXED_LOAD(&bd->flags) & (BF_EVACUATED | BF_NONMOVING)) {
unchain_thunk_selectors(prev_thunk_selector, (StgClosure *)p);
*q = (StgClosure *)p;
// shortcut, behave as for: if (evac) evacuate(q);
@@ -1198,8 +1206,7 @@ selector_chain:
// need the write-barrier stuff.
// - undo the chain we've built to point to p.
SET_INFO((StgClosure *)p, (const StgInfoTable *)info_ptr);
- write_barrier();
- *q = (StgClosure *)p;
+ RELEASE_STORE(q, (StgClosure *) p);
if (evac) evacuate(q);
unchain_thunk_selectors(prev_thunk_selector, (StgClosure *)p);
return;
@@ -1225,7 +1232,7 @@ selector_loop:
// from-space during marking, for example. We rely on the property
// that evacuate() doesn't mind if it gets passed a to-space pointer.
- info = (StgInfoTable*)selectee->header.info;
+ info = RELAXED_LOAD((StgInfoTable**) &selectee->header.info);
if (IS_FORWARDING_PTR(info)) {
// We don't follow pointers into to-space; the constructor
@@ -1252,7 +1259,7 @@ selector_loop:
info->layout.payload.nptrs));
// Select the right field from the constructor
- StgClosure *val = selectee->payload[field];
+ StgClosure *val = RELAXED_LOAD(&selectee->payload[field]);
#if defined(PROFILING)
// For the purposes of LDV profiling, we have destroyed
@@ -1278,19 +1285,19 @@ selector_loop:
// evaluating until we find the real value, and then
// update the whole chain to point to the value.
val_loop:
- info_ptr = (StgWord)UNTAG_CLOSURE(val)->header.info;
+ info_ptr = ACQUIRE_LOAD((StgWord*) &UNTAG_CLOSURE(val)->header.info);
if (!IS_FORWARDING_PTR(info_ptr))
{
info = INFO_PTR_TO_STRUCT((StgInfoTable *)info_ptr);
switch (info->type) {
case IND:
case IND_STATIC:
- val = ((StgInd *)val)->indirectee;
+ val = RELAXED_LOAD(&((StgInd *)val)->indirectee);
goto val_loop;
case THUNK_SELECTOR:
// Use payload to make a list of thunk selectors, to be
// used in unchain_thunk_selectors
- ((StgClosure*)p)->payload[0] = (StgClosure *)prev_thunk_selector;
+ RELAXED_STORE(&((StgClosure*)p)->payload[0], (StgClosure *)prev_thunk_selector);
prev_thunk_selector = p;
p = (StgSelector*)val;
goto selector_chain;
@@ -1298,7 +1305,7 @@ selector_loop:
break;
}
}
- ((StgClosure*)p)->payload[0] = (StgClosure *)prev_thunk_selector;
+ RELAXED_STORE(&((StgClosure*)p)->payload[0], (StgClosure *)prev_thunk_selector);
prev_thunk_selector = p;
*q = val;
@@ -1320,22 +1327,22 @@ selector_loop:
case IND:
case IND_STATIC:
// Again, we might need to untag a constructor.
- selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee );
+ selectee = UNTAG_CLOSURE( RELAXED_LOAD(&((StgInd *)selectee)->indirectee) );
goto selector_loop;
case BLACKHOLE:
{
StgClosure *r;
const StgInfoTable *i;
- r = ((StgInd*)selectee)->indirectee;
+ r = ACQUIRE_LOAD(&((StgInd*)selectee)->indirectee);
// establish whether this BH has been updated, and is now an
// indirection, as in evacuate().
if (GET_CLOSURE_TAG(r) == 0) {
- i = r->header.info;
+ i = ACQUIRE_LOAD(&r->header.info);
if (IS_FORWARDING_PTR(i)) {
r = (StgClosure *)UN_FORWARDING_PTR(i);
- i = r->header.info;
+ i = RELAXED_LOAD(&r->header.info);
}
if (i == &stg_TSO_info
|| i == &stg_WHITEHOLE_info
@@ -1346,7 +1353,7 @@ selector_loop:
ASSERT(i != &stg_IND_info);
}
- selectee = UNTAG_CLOSURE( ((StgInd *)selectee)->indirectee );
+ selectee = UNTAG_CLOSURE( RELAXED_LOAD(&((StgInd *)selectee)->indirectee) );
goto selector_loop;
}
diff --git a/rts/sm/GC.c b/rts/sm/GC.c
index c39dcc2e89..a5aa7e1f4e 100644
--- a/rts/sm/GC.c
+++ b/rts/sm/GC.c
@@ -112,14 +112,8 @@ static W_ g0_pcnt_kept = 30; // percentage of g0 live at last minor GC
/* Mut-list stats */
#if defined(DEBUG)
-uint32_t mutlist_MUTVARS,
- mutlist_MUTARRS,
- mutlist_MVARS,
- mutlist_TVAR,
- mutlist_TVAR_WATCH_QUEUE,
- mutlist_TREC_CHUNK,
- mutlist_TREC_HEADER,
- mutlist_OTHERS;
+// For lack of a better option we protect mutlist_scav_stats with oldest_gen->sync
+MutListScavStats mutlist_scav_stats;
#endif
/* Thread-local data for each GC thread
@@ -184,6 +178,36 @@ bdescr *mark_stack_top_bd; // topmost block in the mark stack
bdescr *mark_stack_bd; // current block in the mark stack
StgPtr mark_sp; // pointer to the next unallocated mark stack entry
+
+/* -----------------------------------------------------------------------------
+ Statistics from mut_list scavenging
+ -------------------------------------------------------------------------- */
+
+#if defined(DEBUG)
+void
+zeroMutListScavStats(MutListScavStats *src)
+{
+ memset(src, 0, sizeof(MutListScavStats));
+}
+
+void
+addMutListScavStats(const MutListScavStats *src,
+ MutListScavStats *dest)
+{
+#define ADD_STATS(field) dest->field += src->field;
+ ADD_STATS(n_MUTVAR);
+ ADD_STATS(n_MUTARR);
+ ADD_STATS(n_MVAR);
+ ADD_STATS(n_TVAR);
+ ADD_STATS(n_TREC_CHUNK);
+ ADD_STATS(n_TVAR_WATCH_QUEUE);
+ ADD_STATS(n_TREC_HEADER);
+ ADD_STATS(n_OTHERS);
+#undef ADD_STATS
+}
+#endif /* DEBUG */
+
+
/* -----------------------------------------------------------------------------
GarbageCollect: the main entry point to the garbage collector.
@@ -250,14 +274,7 @@ GarbageCollect (uint32_t collect_gen,
stablePtrLock();
#if defined(DEBUG)
- mutlist_MUTVARS = 0;
- mutlist_MUTARRS = 0;
- mutlist_MVARS = 0;
- mutlist_TVAR = 0;
- mutlist_TVAR_WATCH_QUEUE = 0;
- mutlist_TREC_CHUNK = 0;
- mutlist_TREC_HEADER = 0;
- mutlist_OTHERS = 0;
+ zeroMutListScavStats(&mutlist_scav_stats);
#endif
// attribute any costs to CCS_GC
@@ -520,37 +537,37 @@ GarbageCollect (uint32_t collect_gen,
const gc_thread* thread;
for (i=0; i < n_gc_threads; i++) {
- copied += gc_threads[i]->copied;
+ copied += RELAXED_LOAD(&gc_threads[i]->copied);
}
for (i=0; i < n_gc_threads; i++) {
thread = gc_threads[i];
if (n_gc_threads > 1) {
debugTrace(DEBUG_gc,"thread %d:", i);
debugTrace(DEBUG_gc," copied %ld",
- thread->copied * sizeof(W_));
+ RELAXED_LOAD(&thread->copied) * sizeof(W_));
debugTrace(DEBUG_gc," scanned %ld",
- thread->scanned * sizeof(W_));
+ RELAXED_LOAD(&thread->scanned) * sizeof(W_));
debugTrace(DEBUG_gc," any_work %ld",
- thread->any_work);
+ RELAXED_LOAD(&thread->any_work));
debugTrace(DEBUG_gc," no_work %ld",
- thread->no_work);
+ RELAXED_LOAD(&thread->no_work));
debugTrace(DEBUG_gc," scav_find_work %ld",
- thread->scav_find_work);
+ RELAXED_LOAD(&thread->scav_find_work));
#if defined(THREADED_RTS) && defined(PROF_SPIN)
- gc_spin_spin += thread->gc_spin.spin;
- gc_spin_yield += thread->gc_spin.yield;
- mut_spin_spin += thread->mut_spin.spin;
- mut_spin_yield += thread->mut_spin.yield;
+ gc_spin_spin += RELAXED_LOAD(&thread->gc_spin.spin);
+ gc_spin_yield += RELAXED_LOAD(&thread->gc_spin.yield);
+ mut_spin_spin += RELAXED_LOAD(&thread->mut_spin.spin);
+ mut_spin_yield += RELAXED_LOAD(&thread->mut_spin.yield);
#endif
- any_work += thread->any_work;
- no_work += thread->no_work;
- scav_find_work += thread->scav_find_work;
+ any_work += RELAXED_LOAD(&thread->any_work);
+ no_work += RELAXED_LOAD(&thread->no_work);
+ scav_find_work += RELAXED_LOAD(&thread->scav_find_work);
- par_max_copied = stg_max(gc_threads[i]->copied, par_max_copied);
+ par_max_copied = stg_max(RELAXED_LOAD(&thread->copied), par_max_copied);
par_balanced_copied_acc +=
- stg_min(n_gc_threads * gc_threads[i]->copied, copied);
+ stg_min(n_gc_threads * RELAXED_LOAD(&thread->copied), copied);
}
}
if (n_gc_threads > 1) {
@@ -590,10 +607,14 @@ GarbageCollect (uint32_t collect_gen,
debugTrace(DEBUG_gc,
"mut_list_size: %lu (%d vars, %d arrays, %d MVARs, %d TVARs, %d TVAR_WATCH_QUEUEs, %d TREC_CHUNKs, %d TREC_HEADERs, %d others)",
(unsigned long)(mut_list_size * sizeof(W_)),
- mutlist_MUTVARS, mutlist_MUTARRS, mutlist_MVARS,
- mutlist_TVAR, mutlist_TVAR_WATCH_QUEUE,
- mutlist_TREC_CHUNK, mutlist_TREC_HEADER,
- mutlist_OTHERS);
+ mutlist_scav_stats.n_MUTVAR,
+ mutlist_scav_stats.n_MUTARR,
+ mutlist_scav_stats.n_MVAR,
+ mutlist_scav_stats.n_TVAR,
+ mutlist_scav_stats.n_TVAR_WATCH_QUEUE,
+ mutlist_scav_stats.n_TREC_CHUNK,
+ mutlist_scav_stats.n_TREC_HEADER,
+ mutlist_scav_stats.n_OTHERS);
}
bdescr *next, *prev;
@@ -1109,7 +1130,7 @@ inc_running (void)
static StgWord
dec_running (void)
{
- ASSERT(gc_running_threads != 0);
+ ASSERT(RELAXED_LOAD(&gc_running_threads) != 0);
return atomic_dec(&gc_running_threads);
}
@@ -1119,7 +1140,7 @@ any_work (void)
int g;
gen_workspace *ws;
- gct->any_work++;
+ NONATOMIC_ADD(&gct->any_work, 1);
write_barrier();
@@ -1193,7 +1214,7 @@ loop:
debugTrace(DEBUG_gc, "%d GC threads still running", r);
- while (gc_running_threads != 0) {
+ while (SEQ_CST_LOAD(&gc_running_threads) != 0) {
// usleep(1);
if (any_work()) {
inc_running();
@@ -1257,10 +1278,13 @@ gcWorkerThread (Capability *cap)
// Wait until we're told to continue
RELEASE_SPIN_LOCK(&gct->gc_spin);
- stat_endGCWorker (cap, gct); // write stats before setting gct->wakeup (#17964,#18717)
- gct->wakeup = GC_THREAD_WAITING_TO_CONTINUE;
debugTrace(DEBUG_gc, "GC thread %d waiting to continue...",
gct->thread_index);
+ stat_endGCWorker (cap, gct);
+ // This must come *after* stat_endGCWorker since it serves to
+ // synchronize us with the GC leader, which will later aggregate the
+ // GC statistics (#17964,#18717)
+ SEQ_CST_STORE(&gct->wakeup, GC_THREAD_WAITING_TO_CONTINUE);
ACQUIRE_SPIN_LOCK(&gct->mut_spin);
debugTrace(DEBUG_gc, "GC thread %d on my way...", gct->thread_index);
@@ -1285,7 +1309,7 @@ waitForGcThreads (Capability *cap USED_IF_THREADS, bool idle_cap[])
while(retry) {
for (i=0; i < n_threads; i++) {
if (i == me || idle_cap[i]) continue;
- if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+ if (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_STANDING_BY) {
prodCapability(capabilities[i], cap->running_task);
}
}
@@ -1295,7 +1319,7 @@ waitForGcThreads (Capability *cap USED_IF_THREADS, bool idle_cap[])
if (i == me || idle_cap[i]) continue;
write_barrier();
interruptCapability(capabilities[i]);
- if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY) {
+ if (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_STANDING_BY) {
retry = true;
}
}
@@ -1352,10 +1376,10 @@ wakeup_gc_threads (uint32_t me USED_IF_THREADS,
if (i == me || idle_cap[i]) continue;
inc_running();
debugTrace(DEBUG_gc, "waking up gc thread %d", i);
- if (gc_threads[i]->wakeup != GC_THREAD_STANDING_BY)
+ if (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_STANDING_BY)
barf("wakeup_gc_threads");
- gc_threads[i]->wakeup = GC_THREAD_RUNNING;
+ SEQ_CST_STORE(&gc_threads[i]->wakeup, GC_THREAD_RUNNING);
ACQUIRE_SPIN_LOCK(&gc_threads[i]->mut_spin);
RELEASE_SPIN_LOCK(&gc_threads[i]->gc_spin);
}
@@ -1376,9 +1400,8 @@ shutdown_gc_threads (uint32_t me USED_IF_THREADS,
for (i=0; i < n_gc_threads; i++) {
if (i == me || idle_cap[i]) continue;
- while (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE) {
+ while (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_WAITING_TO_CONTINUE) {
busy_wait_nop();
- write_barrier();
}
}
#endif
@@ -1393,10 +1416,10 @@ releaseGCThreads (Capability *cap USED_IF_THREADS, bool idle_cap[])
uint32_t i;
for (i=0; i < n_threads; i++) {
if (i == me || idle_cap[i]) continue;
- if (gc_threads[i]->wakeup != GC_THREAD_WAITING_TO_CONTINUE)
+ if (SEQ_CST_LOAD(&gc_threads[i]->wakeup) != GC_THREAD_WAITING_TO_CONTINUE)
barf("releaseGCThreads");
- gc_threads[i]->wakeup = GC_THREAD_INACTIVE;
+ SEQ_CST_STORE(&gc_threads[i]->wakeup, GC_THREAD_INACTIVE);
ACQUIRE_SPIN_LOCK(&gc_threads[i]->gc_spin);
RELEASE_SPIN_LOCK(&gc_threads[i]->mut_spin);
}
@@ -1412,7 +1435,7 @@ static void
stash_mut_list (Capability *cap, uint32_t gen_no)
{
cap->saved_mut_lists[gen_no] = cap->mut_lists[gen_no];
- cap->mut_lists[gen_no] = allocBlockOnNode_sync(cap->node);
+ RELEASE_STORE(&cap->mut_lists[gen_no], allocBlockOnNode_sync(cap->node));
}
/* ----------------------------------------------------------------------------
@@ -1438,9 +1461,11 @@ prepare_collected_gen (generation *gen)
// mutable list always has at least one block; this means we can avoid
// a check for NULL in recordMutable().
for (i = 0; i < n_capabilities; i++) {
- freeChain(capabilities[i]->mut_lists[g]);
- capabilities[i]->mut_lists[g] =
- allocBlockOnNode(capNoToNumaNode(i));
+ bdescr *old = RELAXED_LOAD(&capabilities[i]->mut_lists[g]);
+ freeChain(old);
+
+ bdescr *new = allocBlockOnNode(capNoToNumaNode(i));
+ RELAXED_STORE(&capabilities[i]->mut_lists[g], new);
}
}
@@ -1654,7 +1679,7 @@ collect_pinned_object_blocks (void)
bdescr *last = NULL;
if (use_nonmoving && gen == oldest_gen) {
// Mark objects as belonging to the nonmoving heap
- for (bdescr *bd = capabilities[n]->pinned_object_blocks; bd != NULL; bd = bd->link) {
+ for (bdescr *bd = RELAXED_LOAD(&capabilities[n]->pinned_object_blocks); bd != NULL; bd = bd->link) {
bd->flags |= BF_NONMOVING;
bd->gen = oldest_gen;
bd->gen_no = oldest_gen->no;
@@ -1673,8 +1698,8 @@ collect_pinned_object_blocks (void)
if (gen->large_objects != NULL) {
gen->large_objects->u.back = last;
}
- gen->large_objects = capabilities[n]->pinned_object_blocks;
- capabilities[n]->pinned_object_blocks = NULL;
+ g0->large_objects = RELAXED_LOAD(&capabilities[n]->pinned_object_blocks);
+ RELAXED_STORE(&capabilities[n]->pinned_object_blocks, NULL);
}
}
}
diff --git a/rts/sm/GC.h b/rts/sm/GC.h
index bde006913b..c5d5f6ac81 100644
--- a/rts/sm/GC.h
+++ b/rts/sm/GC.h
@@ -42,20 +42,32 @@ extern StgPtr mark_sp;
extern bool work_stealing;
-#if defined(DEBUG)
-extern uint32_t mutlist_MUTVARS, mutlist_MUTARRS, mutlist_MVARS, mutlist_OTHERS,
- mutlist_TVAR,
- mutlist_TVAR_WATCH_QUEUE,
- mutlist_TREC_CHUNK,
- mutlist_TREC_HEADER;
-#endif
-
#if defined(PROF_SPIN) && defined(THREADED_RTS)
extern volatile StgWord64 whitehole_gc_spin;
extern volatile StgWord64 waitForGcThreads_spin;
extern volatile StgWord64 waitForGcThreads_yield;
#endif
+// mutable list scavenging statistics
+#if defined(DEBUG)
+typedef struct {
+ StgWord n_MUTVAR;
+ StgWord n_MUTARR;
+ StgWord n_MVAR;
+ StgWord n_TVAR;
+ StgWord n_TREC_CHUNK;
+ StgWord n_TVAR_WATCH_QUEUE;
+ StgWord n_TREC_HEADER;
+ StgWord n_OTHERS;
+} MutListScavStats;
+
+extern MutListScavStats mutlist_scav_stats;
+
+void zeroMutListScavStats(MutListScavStats *src);
+void addMutListScavStats(const MutListScavStats *src,
+ MutListScavStats *dest);
+#endif /* DEBUG */
+
void gcWorkerThread (Capability *cap);
void initGcThreads (uint32_t from, uint32_t to);
void freeGcThreads (void);
diff --git a/rts/sm/GCAux.c b/rts/sm/GCAux.c
index 11080c1f22..55b4f99596 100644
--- a/rts/sm/GCAux.c
+++ b/rts/sm/GCAux.c
@@ -83,7 +83,7 @@ isAlive(StgClosure *p)
return p;
}
- info = q->header.info;
+ info = RELAXED_LOAD(&q->header.info);
if (IS_FORWARDING_PTR(info)) {
// alive!
diff --git a/rts/sm/GCUtils.c b/rts/sm/GCUtils.c
index 02c26ddf5e..d58fdc48ae 100644
--- a/rts/sm/GCUtils.c
+++ b/rts/sm/GCUtils.c
@@ -249,8 +249,8 @@ todo_block_full (uint32_t size, gen_workspace *ws)
return p;
}
- gct->copied += ws->todo_free - bd->free;
- bd->free = ws->todo_free;
+ gct->copied += ws->todo_free - RELAXED_LOAD(&bd->free);
+ RELAXED_STORE(&bd->free, ws->todo_free);
ASSERT(bd->u.scan >= bd->start && bd->u.scan <= bd->free);
@@ -330,10 +330,11 @@ alloc_todo_block (gen_workspace *ws, uint32_t size)
gct->free_blocks = bd->link;
}
}
- // blocks in to-space get the BF_EVACUATED flag.
- bd->flags = BF_EVACUATED;
- bd->u.scan = bd->start;
initBdescr(bd, ws->gen, ws->gen->to);
+ RELAXED_STORE(&bd->u.scan, RELAXED_LOAD(&bd->start));
+ // blocks in to-space get the BF_EVACUATED flag.
+ // RELEASE here to ensure that bd->gen is visible to other cores.
+ RELEASE_STORE(&bd->flags, BF_EVACUATED);
}
bd->link = NULL;
@@ -345,7 +346,7 @@ alloc_todo_block (gen_workspace *ws, uint32_t size)
// See Note [big objects]
debugTrace(DEBUG_gc, "alloc new todo block %p for gen %d",
- bd->free, ws->gen->no);
+ RELAXED_LOAD(&bd->free), ws->gen->no);
return ws->todo_free;
}
diff --git a/rts/sm/GCUtils.h b/rts/sm/GCUtils.h
index a71d6dcb92..798a795deb 100644
--- a/rts/sm/GCUtils.h
+++ b/rts/sm/GCUtils.h
@@ -67,7 +67,9 @@ recordMutableGen_GC (StgClosure *p, uint32_t gen_no)
bd = new_bd;
gct->mut_lists[gen_no] = bd;
}
- *bd->free++ = (StgWord)p;
+ *bd->free++ = (StgWord) p;
+ // N.B. we are allocating into our Capability-local mut_list, therefore
+ // we don't need an atomic increment.
}
#include "EndPrivate.h"
diff --git a/rts/sm/MarkWeak.c b/rts/sm/MarkWeak.c
index 65b1338f10..b8d120823c 100644
--- a/rts/sm/MarkWeak.c
+++ b/rts/sm/MarkWeak.c
@@ -414,14 +414,13 @@ markWeakPtrList ( void )
StgWeak *w, **last_w;
last_w = &gen->weak_ptr_list;
- for (w = gen->weak_ptr_list; w != NULL; w = w->link) {
+ for (w = gen->weak_ptr_list; w != NULL; w = RELAXED_LOAD(&w->link)) {
// w might be WEAK, EVACUATED, or DEAD_WEAK (actually CON_STATIC) here
#if defined(DEBUG)
{ // careful to do this assertion only reading the info ptr
// once, because during parallel GC it might change under our feet.
- const StgInfoTable *info;
- info = w->header.info;
+ const StgInfoTable *info = RELAXED_LOAD(&w->header.info);
ASSERT(IS_FORWARDING_PTR(info)
|| info == &stg_DEAD_WEAK_info
|| INFO_PTR_TO_STRUCT(info)->type == WEAK);
diff --git a/rts/sm/NonMoving.c b/rts/sm/NonMoving.c
index 05f8481fe2..5cb754b539 100644
--- a/rts/sm/NonMoving.c
+++ b/rts/sm/NonMoving.c
@@ -726,6 +726,7 @@ void nonmovingStop(void)
"waiting for nonmoving collector thread to terminate");
ACQUIRE_LOCK(&concurrent_coll_finished_lock);
waitCondition(&concurrent_coll_finished, &concurrent_coll_finished_lock);
+ joinOSThread(mark_thread);
}
#endif
}
diff --git a/rts/sm/Scav.c b/rts/sm/Scav.c
index 869e6b4dc8..c1f3faf998 100644
--- a/rts/sm/Scav.c
+++ b/rts/sm/Scav.c
@@ -65,6 +65,8 @@
#include "sm/NonMoving.h" // for nonmoving_set_closure_mark_bit
#include "sm/NonMovingScav.h"
+#include <string.h> /* for memset */
+
static void scavenge_large_bitmap (StgPtr p,
StgLargeBitmap *large_bitmap,
StgWord size );
@@ -201,9 +203,9 @@ scavenge_compact(StgCompactNFData *str)
gct->eager_promotion = saved_eager;
if (gct->failed_to_evac) {
- ((StgClosure *)str)->header.info = &stg_COMPACT_NFDATA_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *)str)->header.info, &stg_COMPACT_NFDATA_DIRTY_info);
} else {
- ((StgClosure *)str)->header.info = &stg_COMPACT_NFDATA_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *)str)->header.info, &stg_COMPACT_NFDATA_CLEAN_info);
}
}
@@ -464,9 +466,9 @@ scavenge_block (bdescr *bd)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- mvar->header.info = &stg_MVAR_DIRTY_info;
+ RELEASE_STORE(&mvar->header.info, &stg_MVAR_DIRTY_info);
} else {
- mvar->header.info = &stg_MVAR_CLEAN_info;
+ RELEASE_STORE(&mvar->header.info, &stg_MVAR_CLEAN_info);
}
p += sizeofW(StgMVar);
break;
@@ -481,9 +483,9 @@ scavenge_block (bdescr *bd)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- tvar->header.info = &stg_TVAR_DIRTY_info;
+ RELEASE_STORE(&tvar->header.info, &stg_TVAR_DIRTY_info);
} else {
- tvar->header.info = &stg_TVAR_CLEAN_info;
+ RELEASE_STORE(&tvar->header.info, &stg_TVAR_CLEAN_info);
}
p += sizeofW(StgTVar);
break;
@@ -615,9 +617,9 @@ scavenge_block (bdescr *bd)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_MUT_VAR_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_VAR_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_MUT_VAR_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_VAR_CLEAN_info);
}
p += sizeofW(StgMutVar);
break;
@@ -634,9 +636,9 @@ scavenge_block (bdescr *bd)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info);
} else {
- bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+ RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_CLEAN_info);
}
p += sizeofW(StgBlockingQueue);
break;
@@ -686,9 +688,9 @@ scavenge_block (bdescr *bd)
p = scavenge_mut_arr_ptrs((StgMutArrPtrs*)p);
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_CLEAN_info);
}
gct->eager_promotion = saved_eager_promotion;
@@ -703,9 +705,9 @@ scavenge_block (bdescr *bd)
p = scavenge_mut_arr_ptrs((StgMutArrPtrs*)p);
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info);
}
break;
}
@@ -728,9 +730,9 @@ scavenge_block (bdescr *bd)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_SMALL_MUT_ARR_PTRS_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_SMALL_MUT_ARR_PTRS_CLEAN_info);
}
gct->failed_to_evac = true; // always put it on the mutable list.
@@ -749,9 +751,9 @@ scavenge_block (bdescr *bd)
}
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info);
}
break;
}
@@ -834,7 +836,7 @@ scavenge_block (bdescr *bd)
if (p > bd->free) {
gct->copied += ws->todo_free - bd->free;
- bd->free = p;
+ RELEASE_STORE(&bd->free, p);
}
debugTrace(DEBUG_gc, " scavenged %ld bytes",
@@ -889,9 +891,9 @@ scavenge_mark_stack(void)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- mvar->header.info = &stg_MVAR_DIRTY_info;
+ RELEASE_STORE(&mvar->header.info, &stg_MVAR_DIRTY_info);
} else {
- mvar->header.info = &stg_MVAR_CLEAN_info;
+ RELEASE_STORE(&mvar->header.info, &stg_MVAR_CLEAN_info);
}
break;
}
@@ -905,9 +907,9 @@ scavenge_mark_stack(void)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- tvar->header.info = &stg_TVAR_DIRTY_info;
+ RELEASE_STORE(&tvar->header.info, &stg_TVAR_DIRTY_info);
} else {
- tvar->header.info = &stg_TVAR_CLEAN_info;
+ RELEASE_STORE(&tvar->header.info, &stg_TVAR_CLEAN_info);
}
break;
}
@@ -1011,9 +1013,9 @@ scavenge_mark_stack(void)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_MUT_VAR_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_VAR_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_MUT_VAR_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_VAR_CLEAN_info);
}
break;
}
@@ -1030,9 +1032,9 @@ scavenge_mark_stack(void)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info);
} else {
- bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+ RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_CLEAN_info);
}
break;
}
@@ -1078,9 +1080,9 @@ scavenge_mark_stack(void)
scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_CLEAN_info);
}
gct->eager_promotion = saved_eager_promotion;
@@ -1097,9 +1099,9 @@ scavenge_mark_stack(void)
scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *) q)->header.info, &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info);
}
break;
}
@@ -1124,9 +1126,9 @@ scavenge_mark_stack(void)
gct->eager_promotion = saved_eager;
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_CLEAN_info);
}
gct->failed_to_evac = true; // mutable anyhow.
@@ -1145,9 +1147,9 @@ scavenge_mark_stack(void)
}
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info);
}
break;
}
@@ -1251,9 +1253,9 @@ scavenge_one(StgPtr p)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- mvar->header.info = &stg_MVAR_DIRTY_info;
+ RELEASE_STORE(&mvar->header.info, &stg_MVAR_DIRTY_info);
} else {
- mvar->header.info = &stg_MVAR_CLEAN_info;
+ RELEASE_STORE(&mvar->header.info, &stg_MVAR_CLEAN_info);
}
break;
}
@@ -1267,9 +1269,9 @@ scavenge_one(StgPtr p)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- tvar->header.info = &stg_TVAR_DIRTY_info;
+ RELEASE_STORE(&tvar->header.info, &stg_TVAR_DIRTY_info);
} else {
- tvar->header.info = &stg_TVAR_CLEAN_info;
+ RELEASE_STORE(&tvar->header.info, &stg_TVAR_CLEAN_info);
}
break;
}
@@ -1331,9 +1333,9 @@ scavenge_one(StgPtr p)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_MUT_VAR_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_MUT_VAR_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_MUT_VAR_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_MUT_VAR_CLEAN_info);
}
break;
}
@@ -1350,9 +1352,9 @@ scavenge_one(StgPtr p)
gct->eager_promotion = saved_eager_promotion;
if (gct->failed_to_evac) {
- bq->header.info = &stg_BLOCKING_QUEUE_DIRTY_info;
+ RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_DIRTY_info);
} else {
- bq->header.info = &stg_BLOCKING_QUEUE_CLEAN_info;
+ RELEASE_STORE(&bq->header.info, &stg_BLOCKING_QUEUE_CLEAN_info);
}
break;
}
@@ -1398,9 +1400,9 @@ scavenge_one(StgPtr p)
scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
if (gct->failed_to_evac) {
- ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_DIRTY_info);
} else {
- ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_CLEAN_info);
}
gct->eager_promotion = saved_eager_promotion;
@@ -1415,9 +1417,9 @@ scavenge_one(StgPtr p)
scavenge_mut_arr_ptrs((StgMutArrPtrs *)p);
if (gct->failed_to_evac) {
- ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_FROZEN_DIRTY_info);
} else {
- ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_FROZEN_CLEAN_info);
}
break;
}
@@ -1442,9 +1444,9 @@ scavenge_one(StgPtr p)
gct->eager_promotion = saved_eager;
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_CLEAN_info);
}
gct->failed_to_evac = true;
@@ -1463,9 +1465,9 @@ scavenge_one(StgPtr p)
}
if (gct->failed_to_evac) {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_DIRTY_info);
} else {
- ((StgClosure *)q)->header.info = &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *)q)->header.info, &stg_SMALL_MUT_ARR_PTRS_FROZEN_CLEAN_info);
}
break;
}
@@ -1583,6 +1585,10 @@ static void
scavenge_mutable_list(bdescr *bd, generation *gen)
{
StgPtr p, q;
+#if defined(DEBUG)
+ MutListScavStats stats; // Local accumulator
+ zeroMutListScavStats(&stats);
+#endif
uint32_t gen_no = gen->no;
gct->evac_gen_no = gen_no;
@@ -1598,31 +1604,31 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
case MUT_VAR_CLEAN:
// can happen due to concurrent writeMutVars
case MUT_VAR_DIRTY:
- mutlist_MUTVARS++; break;
+ stats.n_MUTVAR++; break;
case MUT_ARR_PTRS_CLEAN:
case MUT_ARR_PTRS_DIRTY:
case MUT_ARR_PTRS_FROZEN_CLEAN:
case MUT_ARR_PTRS_FROZEN_DIRTY:
- mutlist_MUTARRS++; break;
+ stats.n_MUTARR++; break;
case MVAR_CLEAN:
barf("MVAR_CLEAN on mutable list");
case MVAR_DIRTY:
- mutlist_MVARS++; break;
+ stats.n_MVAR++; break;
case TVAR:
- mutlist_TVAR++; break;
+ stats.n_TVAR++; break;
case TREC_CHUNK:
- mutlist_TREC_CHUNK++; break;
+ stats.n_TREC_CHUNK++; break;
case MUT_PRIM:
pinfo = ((StgClosure*)p)->header.info;
if (pinfo == &stg_TVAR_WATCH_QUEUE_info)
- mutlist_TVAR_WATCH_QUEUE++;
+ stats.n_TVAR_WATCH_QUEUE++;
else if (pinfo == &stg_TREC_HEADER_info)
- mutlist_TREC_HEADER++;
+ stats.n_TREC_HEADER++;
else
- mutlist_OTHERS++;
+ stats.n_OTHERS++;
break;
default:
- mutlist_OTHERS++; break;
+ stats.n_OTHERS++; break;
}
#endif
@@ -1647,9 +1653,9 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
scavenge_mut_arr_ptrs_marked((StgMutArrPtrs *)p);
if (gct->failed_to_evac) {
- ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_DIRTY_info;
+ RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_DIRTY_info);
} else {
- ((StgClosure *)p)->header.info = &stg_MUT_ARR_PTRS_CLEAN_info;
+ RELEASE_STORE(&((StgClosure *)p)->header.info, &stg_MUT_ARR_PTRS_CLEAN_info);
}
gct->eager_promotion = saved_eager_promotion;
@@ -1671,6 +1677,13 @@ scavenge_mutable_list(bdescr *bd, generation *gen)
}
}
}
+
+#if defined(DEBUG)
+ // For lack of a better option we protect mutlist_scav_stats with oldest_gen->sync
+ ACQUIRE_SPIN_LOCK(&oldest_gen->sync);
+ addMutListScavStats(&stats, &mutlist_scav_stats);
+ RELEASE_SPIN_LOCK(&oldest_gen->sync);
+#endif
}
void
@@ -1740,8 +1753,9 @@ scavenge_static(void)
/* Take this object *off* the static_objects list,
* and put it on the scavenged_static_objects list.
*/
- gct->static_objects = *STATIC_LINK(info,p);
- *STATIC_LINK(info,p) = gct->scavenged_static_objects;
+ StgClosure **link = STATIC_LINK(info,p);
+ gct->static_objects = RELAXED_LOAD(link);
+ RELAXED_STORE(link, gct->scavenged_static_objects);
gct->scavenged_static_objects = flagged_p;
switch (info -> type) {
diff --git a/rts/sm/Storage.c b/rts/sm/Storage.c
index 96bc133d02..98aefa9a4b 100644
--- a/rts/sm/Storage.c
+++ b/rts/sm/Storage.c
@@ -302,7 +302,7 @@ exitStorage (void)
{
nonmovingExit();
updateNurseriesStats();
- stat_exit();
+ stat_exitReport();
}
void
@@ -445,7 +445,7 @@ lockCAF (StgRegTable *reg, StgIndStatic *caf)
Capability *cap = regTableToCapability(reg);
StgInd *bh;
- orig_info = caf->header.info;
+ orig_info = RELAXED_LOAD(&caf->header.info);
#if defined(THREADED_RTS)
const StgInfoTable *cur_info;
@@ -501,12 +501,11 @@ lockCAF (StgRegTable *reg, StgIndStatic *caf)
}
bh->indirectee = (StgClosure *)cap->r.rCurrentTSO;
SET_HDR(bh, &stg_CAF_BLACKHOLE_info, caf->header.prof.ccs);
- // Ensure that above writes are visible before we introduce reference as CAF indirectee.
- write_barrier();
- caf->indirectee = (StgClosure *)bh;
- write_barrier();
- SET_INFO((StgClosure*)caf,&stg_IND_STATIC_info);
+ // RELEASE ordering to ensure that above writes are visible before we
+ // introduce reference as CAF indirectee.
+ RELEASE_STORE(&caf->indirectee, (StgClosure *) bh);
+ SET_INFO_RELEASE((StgClosure*)caf, &stg_IND_STATIC_info);
return bh;
}
@@ -1033,8 +1032,8 @@ allocateMightFail (Capability *cap, W_ n)
g0->n_new_large_words += n;
RELEASE_SM_LOCK;
initBdescr(bd, g0, g0);
- bd->flags = BF_LARGE;
- bd->free = bd->start + n;
+ RELAXED_STORE(&bd->flags, BF_LARGE);
+ RELAXED_STORE(&bd->free, bd->start + n);
cap->total_allocated += n;
return bd->start;
}
@@ -1300,8 +1299,8 @@ dirty_MUT_VAR(StgRegTable *reg, StgMutVar *mvar, StgClosure *old)
Capability *cap = regTableToCapability(reg);
// No barrier required here as no other heap object fields are read. See
// note [Heap memory barriers] in SMP.h.
- if (mvar->header.info == &stg_MUT_VAR_CLEAN_info) {
- mvar->header.info = &stg_MUT_VAR_DIRTY_info;
+ if (RELAXED_LOAD(&mvar->header.info) == &stg_MUT_VAR_CLEAN_info) {
+ SET_INFO((StgClosure*) mvar, &stg_MUT_VAR_DIRTY_info);
recordClosureMutated(cap, (StgClosure *) mvar);
IF_NONMOVING_WRITE_BARRIER_ENABLED {
// See Note [Dirty flags in the non-moving collector] in NonMoving.c
@@ -1323,8 +1322,8 @@ dirty_TVAR(Capability *cap, StgTVar *p,
{
// No barrier required here as no other heap object fields are read. See
// note [Heap memory barriers] in SMP.h.
- if (p->header.info == &stg_TVAR_CLEAN_info) {
- p->header.info = &stg_TVAR_DIRTY_info;
+ if (RELAXED_LOAD(&p->header.info) == &stg_TVAR_CLEAN_info) {
+ SET_INFO((StgClosure*) p, &stg_TVAR_DIRTY_info);
recordClosureMutated(cap,(StgClosure*)p);
IF_NONMOVING_WRITE_BARRIER_ENABLED {
// See Note [Dirty flags in the non-moving collector] in NonMoving.c
@@ -1341,8 +1340,8 @@ dirty_TVAR(Capability *cap, StgTVar *p,
void
setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
{
- if (tso->dirty == 0) {
- tso->dirty = 1;
+ if (RELAXED_LOAD(&tso->dirty) == 0) {
+ RELAXED_STORE(&tso->dirty, 1);
recordClosureMutated(cap,(StgClosure*)tso);
IF_NONMOVING_WRITE_BARRIER_ENABLED {
updateRemembSetPushClosure(cap, (StgClosure *) tso->_link);
@@ -1354,8 +1353,8 @@ setTSOLink (Capability *cap, StgTSO *tso, StgTSO *target)
void
setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target)
{
- if (tso->dirty == 0) {
- tso->dirty = 1;
+ if (RELAXED_LOAD(&tso->dirty) == 0) {
+ RELAXED_STORE(&tso->dirty, 1);
recordClosureMutated(cap,(StgClosure*)tso);
IF_NONMOVING_WRITE_BARRIER_ENABLED {
updateRemembSetPushClosure(cap, (StgClosure *) tso->block_info.prev);
@@ -1367,8 +1366,8 @@ setTSOPrev (Capability *cap, StgTSO *tso, StgTSO *target)
void
dirty_TSO (Capability *cap, StgTSO *tso)
{
- if (tso->dirty == 0) {
- tso->dirty = 1;
+ if (RELAXED_LOAD(&tso->dirty) == 0) {
+ RELAXED_STORE(&tso->dirty, 1);
recordClosureMutated(cap,(StgClosure*)tso);
}
@@ -1386,8 +1385,8 @@ dirty_STACK (Capability *cap, StgStack *stack)
updateRemembSetPushStack(cap, stack);
}
- if (! (stack->dirty & STACK_DIRTY)) {
- stack->dirty = STACK_DIRTY;
+ if (RELAXED_LOAD(&stack->dirty) == 0) {
+ RELAXED_STORE(&stack->dirty, 1);
recordClosureMutated(cap,(StgClosure*)stack);
}
@@ -1562,10 +1561,13 @@ calcNeeded (bool force_major, memcount *blocks_needed)
for (uint32_t g = 0; g < RtsFlags.GcFlags.generations; g++) {
generation *gen = &generations[g];
-
W_ blocks = gen->live_estimate ? (gen->live_estimate / BLOCK_SIZE_W) : gen->n_blocks;
- blocks += gen->n_large_blocks
- + gen->n_compact_blocks;
+
+ // This can race with allocate() and compactAllocateBlockInternal()
+ // but only needs to be approximate
+ TSAN_ANNOTATE_BENIGN_RACE(&gen->n_large_blocks, "n_large_blocks");
+ blocks += RELAXED_LOAD(&gen->n_large_blocks)
+ + RELAXED_LOAD(&gen->n_compact_blocks);
// we need at least this much space
needed += blocks;
diff --git a/rts/sm/Storage.h b/rts/sm/Storage.h
index 8d90c3ba5f..48ddcf35f5 100644
--- a/rts/sm/Storage.h
+++ b/rts/sm/Storage.h
@@ -72,8 +72,11 @@ bool getNewNursery (Capability *cap);
INLINE_HEADER
bool doYouWantToGC(Capability *cap)
{
+ // This is necessarily approximate since otherwise we would need to take
+ // SM_LOCK to safely look at n_new_large_words.
+ TSAN_ANNOTATE_BENIGN_RACE(&g0->n_new_large_words, "doYouWantToGC(n_new_large_words)");
return ((cap->r.rCurrentNursery->link == NULL && !getNewNursery(cap)) ||
- g0->n_new_large_words >= large_alloc_lim);
+ RELAXED_LOAD(&g0->n_new_large_words) >= large_alloc_lim);
}
/* -----------------------------------------------------------------------------
@@ -91,7 +94,7 @@ INLINE_HEADER void finishedNurseryBlock (Capability *cap, bdescr *bd) {
}
INLINE_HEADER void newNurseryBlock (bdescr *bd) {
- bd->free = bd->start;
+ RELAXED_STORE(&bd->free, bd->start);
}
void updateNurseriesStats (void);
diff --git a/rts/win32/OSThreads.c b/rts/win32/OSThreads.c
index f3bdefd998..ed8a598e51 100644
--- a/rts/win32/OSThreads.c
+++ b/rts/win32/OSThreads.c
@@ -444,6 +444,15 @@ interruptOSThread (OSThreadId id)
CloseHandle(hdl);
}
+void
+joinOSThread (OSThreadId id)
+{
+ int ret = WaitForSingleObject(id, INFINITE);
+ if (ret != WAIT_OBJECT_0) {
+ sysErrorBelch("joinOSThread: error %d", ret);
+ }
+}
+
void setThreadNode (uint32_t node)
{
if (osNumaAvailable())