summaryrefslogtreecommitdiff
path: root/rts/Capability.c
diff options
context:
space:
mode:
authorBen Gamari <ben@smart-cactus.org>2020-11-08 09:29:16 -0500
committerBen Gamari <ben@smart-cactus.org>2020-11-08 09:29:16 -0500
commit638f38c50e80a19275f3a06535a0dd8130a17a53 (patch)
treeac18855cd2f39544e4841866fbabb3f86a4d1f35 /rts/Capability.c
parentb1d2c1f3246b3740589a59bdf7648c13de47c32b (diff)
parent07e82ba52228580cfbd90ff031e657acbecc715b (diff)
downloadhaskell-638f38c50e80a19275f3a06535a0dd8130a17a53.tar.gz
Merge remote-tracking branch 'origin/wip/tsan/all'
Diffstat (limited to 'rts/Capability.c')
-rw-r--r--rts/Capability.c254
1 files changed, 177 insertions, 77 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 8dddce7028..a655fc7b3f 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -84,8 +84,8 @@ Capability * rts_unsafeGetMyCapability (void)
STATIC_INLINE bool
globalWorkToDo (void)
{
- return sched_state >= SCHED_INTERRUPTING
- || recent_activity == ACTIVITY_INACTIVE; // need to check for deadlock
+ return RELAXED_LOAD(&sched_state) >= SCHED_INTERRUPTING
+ || RELAXED_LOAD(&recent_activity) == ACTIVITY_INACTIVE; // need to check for deadlock
}
#endif
@@ -211,7 +211,10 @@ newReturningTask (Capability *cap, Task *task)
cap->returning_tasks_hd = task;
}
cap->returning_tasks_tl = task;
- cap->n_returning_tasks++;
+
+ // See Note [Data race in shouldYieldCapability] in Schedule.c.
+ RELAXED_ADD(&cap->n_returning_tasks, 1);
+
ASSERT_RETURNING_TASKS(cap,task);
}
@@ -227,7 +230,10 @@ popReturningTask (Capability *cap)
cap->returning_tasks_tl = NULL;
}
task->next = NULL;
- cap->n_returning_tasks--;
+
+ // See Note [Data race in shouldYieldCapability] in Schedule.c.
+ RELAXED_ADD(&cap->n_returning_tasks, -1);
+
ASSERT_RETURNING_TASKS(cap,task);
return task;
}
@@ -306,6 +312,7 @@ initCapability (Capability *cap, uint32_t i)
cap->free_trec_headers = NO_TREC;
cap->transaction_tokens = 0;
cap->context_switch = 0;
+ cap->interrupt = 0;
cap->pinned_object_block = NULL;
cap->pinned_object_blocks = NULL;
@@ -409,36 +416,44 @@ void
moreCapabilities (uint32_t from USED_IF_THREADS, uint32_t to USED_IF_THREADS)
{
#if defined(THREADED_RTS)
- uint32_t i;
- Capability **old_capabilities = capabilities;
+ Capability **new_capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
- capabilities = stgMallocBytes(to * sizeof(Capability*), "moreCapabilities");
+ // We must disable the timer while we do this since the tick handler may
+ // call contextSwitchAllCapabilities, which may see the capabilities array
+ // as we free it. The alternative would be to protect the capabilities
+ // array with a lock but this seems more expensive than necessary.
+ // See #17289.
+ stopTimer();
if (to == 1) {
// THREADED_RTS must work on builds that don't have a mutable
// BaseReg (eg. unregisterised), so in this case
// capabilities[0] must coincide with &MainCapability.
- capabilities[0] = &MainCapability;
+ new_capabilities[0] = &MainCapability;
initCapability(&MainCapability, 0);
}
else
{
- for (i = 0; i < to; i++) {
+ for (uint32_t i = 0; i < to; i++) {
if (i < from) {
- capabilities[i] = old_capabilities[i];
+ new_capabilities[i] = capabilities[i];
} else {
- capabilities[i] = stgMallocBytes(sizeof(Capability),
- "moreCapabilities");
- initCapability(capabilities[i], i);
+ new_capabilities[i] = stgMallocBytes(sizeof(Capability),
+ "moreCapabilities");
+ initCapability(new_capabilities[i], i);
}
}
}
debugTrace(DEBUG_sched, "allocated %d more capabilities", to - from);
+ Capability **old_capabilities = ACQUIRE_LOAD(&capabilities);
+ RELEASE_STORE(&capabilities, new_capabilities);
if (old_capabilities != NULL) {
stgFree(old_capabilities);
}
+
+ startTimer();
#endif
}
@@ -504,6 +519,9 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
* The current Task (cap->task) releases the Capability. The Capability is
* marked free, and if there is any work to do, an appropriate Task is woken up.
*
+ * The caller must hold cap->lock and will still hold it after
+ * releaseCapability returns.
+ *
* N.B. May need to take all_tasks_mutex.
*
* ------------------------------------------------------------------------- */
@@ -519,8 +537,9 @@ releaseCapability_ (Capability* cap,
ASSERT_PARTIAL_CAPABILITY_INVARIANTS(cap,task);
ASSERT_RETURNING_TASKS(cap,task);
+ ASSERT_LOCK_HELD(&cap->lock);
- cap->running_task = NULL;
+ RELAXED_STORE(&cap->running_task, NULL);
// Check to see whether a worker thread can be given
// the go-ahead to return the result of an external call..
@@ -539,7 +558,7 @@ releaseCapability_ (Capability* cap,
// be currently in waitForCapability() waiting for this
// capability, in which case simply setting it as free would not
// wake up the waiting task.
- PendingSync *sync = pending_sync;
+ PendingSync *sync = SEQ_CST_LOAD(&pending_sync);
if (sync && (sync->type != SYNC_GC_PAR || sync->idle[cap->no])) {
debugTrace(DEBUG_sched, "sync pending, freeing capability %d", cap->no);
return;
@@ -563,7 +582,7 @@ releaseCapability_ (Capability* cap,
// is interrupted, we only create a worker task if there
// are threads that need to be completed. If the system is
// shutting down, we never create a new worker.
- if (sched_state < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
+ if (RELAXED_LOAD(&sched_state) < SCHED_SHUTTING_DOWN || !emptyRunQueue(cap)) {
debugTrace(DEBUG_sched,
"starting new worker on capability %d", cap->no);
startWorkerTask(cap);
@@ -586,7 +605,7 @@ releaseCapability_ (Capability* cap,
#if defined(PROFILING)
cap->r.rCCCS = CCS_IDLE;
#endif
- last_free_capability[cap->node] = cap;
+ RELAXED_STORE(&last_free_capability[cap->node], cap);
debugTrace(DEBUG_sched, "freeing capability %d", cap->no);
}
@@ -638,6 +657,36 @@ enqueueWorker (Capability* cap USED_IF_THREADS)
#endif
+/*
+ * Note [Benign data race due to work-pushing]
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ *
+ * #17276 points out a tricky data race (noticed by ThreadSanitizer) between
+ * waitForWorkerCapability and schedulePushWork. In short, schedulePushWork
+ * works as follows:
+ *
+ * 1. collect the set of all idle capabilities, take cap->lock of each.
+ *
+ * 2. sort through each TSO on the calling capability's run queue and push
+ * some to idle capabilities. This may (if the TSO is a bound thread)
+ * involve setting tso->bound->task->cap despite not holding
+ * tso->bound->task->lock.
+ *
+ * 3. release cap->lock of all idle capabilities.
+ *
+ * Now, step 2 is in principle safe since the capability of the caller of
+ * schedulePushWork *owns* the TSO and therefore the Task to which it is bound.
+ * Furthermore, step 3 ensures that the write in step (2) will be visible to
+ * any core which starts execution of the previously-idle capability.
+ *
+ * However, this argument doesn't quite work for waitForWorkerCapability, which
+ * reads task->cap *without* first owning the capability which owns `task`.
+ * For this reason, we check again whether the task has been migrated to
+ * another capability after taking task->cap->lock. See Note [migrated bound
+ * threads] above.
+ *
+ */
+
/* ----------------------------------------------------------------------------
* waitForWorkerCapability(task)
*
@@ -655,7 +704,13 @@ static Capability * waitForWorkerCapability (Task *task)
ACQUIRE_LOCK(&task->lock);
// task->lock held, cap->lock not held
if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ // The happens-after matches the happens-before in
+ // schedulePushWork, which does owns 'task' when it sets 'task->cap'.
+ TSAN_ANNOTATE_HAPPENS_AFTER(&task->cap);
cap = task->cap;
+
+ // See Note [Benign data race due to work-pushing].
+ TSAN_ANNOTATE_BENIGN_RACE(&task->cap, "we will double-check this below");
task->wakeup = false;
RELEASE_LOCK(&task->lock);
@@ -691,7 +746,7 @@ static Capability * waitForWorkerCapability (Task *task)
cap->n_spare_workers--;
}
- cap->running_task = task;
+ RELAXED_STORE(&cap->running_task, task);
RELEASE_LOCK(&cap->lock);
break;
}
@@ -732,7 +787,7 @@ static Capability * waitForReturnCapability (Task *task)
RELEASE_LOCK(&cap->lock);
continue;
}
- cap->running_task = task;
+ RELAXED_STORE(&cap->running_task, task);
popReturningTask(cap);
RELEASE_LOCK(&cap->lock);
break;
@@ -745,6 +800,65 @@ static Capability * waitForReturnCapability (Task *task)
#endif /* THREADED_RTS */
+#if defined(THREADED_RTS)
+
+/* ----------------------------------------------------------------------------
+ * capability_is_busy (Capability *cap)
+ *
+ * A predicate for determining whether the given Capability is currently running
+ * a Task. This can be safely called without holding the Capability's lock
+ * although the result may be inaccurate if it races with the scheduler.
+ * Consequently there is a TSAN suppression for it.
+ *
+ * ------------------------------------------------------------------------- */
+static bool capability_is_busy(const Capability * cap)
+{
+ return RELAXED_LOAD(&cap->running_task) != NULL;
+}
+
+
+/* ----------------------------------------------------------------------------
+ * find_capability_for_task
+ *
+ * Given a Task, identify a reasonable Capability to run it on. We try to
+ * find an idle capability if possible.
+ *
+ * ------------------------------------------------------------------------- */
+
+static Capability * find_capability_for_task(const Task * task)
+{
+ if (task->preferred_capability != -1) {
+ // Does the task have a preferred capability? If so, use it
+ return capabilities[task->preferred_capability %
+ enabled_capabilities];
+ } else {
+ // Try last_free_capability first
+ Capability *cap = RELAXED_LOAD(&last_free_capability[task->node]);
+
+ // N.B. There is a data race here since we are loking at
+ // cap->running_task without taking cap->lock. However, this is
+ // benign since the result is merely guiding our search heuristic.
+ if (!capability_is_busy(cap)) {
+ return cap;
+ } else {
+ // The last_free_capability is already busy, search for a free
+ // capability on this node.
+ for (uint32_t i = task->node; i < enabled_capabilities;
+ i += n_numa_nodes) {
+ // visits all the capabilities on this node, because
+ // cap[i]->node == i % n_numa_nodes
+ if (!RELAXED_LOAD(&capabilities[i]->running_task)) {
+ return capabilities[i];
+ }
+ }
+
+ // Can't find a free one, use last_free_capability.
+ return RELAXED_LOAD(&last_free_capability[task->node]);
+ }
+ }
+}
+#endif /* THREADED_RTS */
+
/* ----------------------------------------------------------------------------
* waitForCapability (Capability **pCap, Task *task)
*
@@ -767,38 +881,13 @@ void waitForCapability (Capability **pCap, Task *task)
*pCap = &MainCapability;
#else
- uint32_t i;
Capability *cap = *pCap;
if (cap == NULL) {
- if (task->preferred_capability != -1) {
- cap = capabilities[task->preferred_capability %
- enabled_capabilities];
- } else {
- // Try last_free_capability first
- cap = last_free_capability[task->node];
- if (cap->running_task) {
- // Otherwise, search for a free capability on this node.
- cap = NULL;
- for (i = task->node; i < enabled_capabilities;
- i += n_numa_nodes) {
- // visits all the capabilities on this node, because
- // cap[i]->node == i % n_numa_nodes
- if (!capabilities[i]->running_task) {
- cap = capabilities[i];
- break;
- }
- }
- if (cap == NULL) {
- // Can't find a free one, use last_free_capability.
- cap = last_free_capability[task->node];
- }
- }
- }
+ cap = find_capability_for_task(task);
// record the Capability as the one this Task is now associated with.
task->cap = cap;
-
} else {
ASSERT(task->cap == cap);
}
@@ -808,7 +897,7 @@ void waitForCapability (Capability **pCap, Task *task)
ACQUIRE_LOCK(&cap->lock);
if (!cap->running_task) {
// It's free; just grab it
- cap->running_task = task;
+ RELAXED_STORE(&cap->running_task, task);
RELEASE_LOCK(&cap->lock);
} else {
newReturningTask(cap,task);
@@ -872,7 +961,7 @@ yieldCapability
if (gcAllowed)
{
- PendingSync *sync = pending_sync;
+ PendingSync *sync = SEQ_CST_LOAD(&pending_sync);
if (sync) {
switch (sync->type) {
@@ -943,33 +1032,41 @@ yieldCapability
#endif /* THREADED_RTS */
-// Note [migrated bound threads]
-//
-// There's a tricky case where:
-// - cap A is running an unbound thread T1
-// - there is a bound thread T2 at the head of the run queue on cap A
-// - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
-// - T1 returns quickly grabbing A again (T2 is still waking up on A)
-// - T1 blocks, the scheduler migrates T2 to cap B
-// - the task bound to T2 wakes up on cap B
-//
-// We take advantage of the following invariant:
-//
-// - A bound thread can only be migrated by the holder of the
-// Capability on which the bound thread currently lives. So, if we
-// hold Capability C, and task->cap == C, then task cannot be
-// migrated under our feet.
-
-// Note [migrated bound threads 2]
-//
-// Second tricky case;
-// - A bound Task becomes a GC thread
-// - scheduleDoGC() migrates the thread belonging to this Task,
-// because the Capability it is on is disabled
-// - after GC, gcWorkerThread() returns, but now we are
-// holding a Capability that is not the same as task->cap
-// - Hence we must check for this case and immediately give up the
-// cap we hold.
+/*
+ * Note [migrated bound threads]
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ *
+ * There's a tricky case where:
+ * - cap A is running an unbound thread T1
+ * - there is a bound thread T2 at the head of the run queue on cap A
+ * - T1 makes a safe foreign call, the task bound to T2 is woken up on cap A
+ * - T1 returns quickly grabbing A again (T2 is still waking up on A)
+ * - T1 blocks, the scheduler migrates T2 to cap B
+ * - the task bound to T2 wakes up on cap B
+ *
+ * We take advantage of the following invariant:
+ *
+ * - A bound thread can only be migrated by the holder of the
+ * Capability on which the bound thread currently lives. So, if we
+ * hold Capability C, and task->cap == C, then task cannot be
+ * migrated under our feet.
+ *
+ * See also Note [Benign data race due to work-pushing].
+ *
+ *
+ * Note [migrated bound threads 2]
+ * ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+ *
+ * Second tricky case;
+ * - A bound Task becomes a GC thread
+ * - scheduleDoGC() migrates the thread belonging to this Task,
+ * because the Capability it is on is disabled
+ * - after GC, gcWorkerThread() returns, but now we are
+ * holding a Capability that is not the same as task->cap
+ * - Hence we must check for this case and immediately give up the
+ * cap we hold.
+ *
+ */
/* ----------------------------------------------------------------------------
* prodCapability
@@ -1006,7 +1103,10 @@ bool
tryGrabCapability (Capability *cap, Task *task)
{
int r;
- if (cap->running_task != NULL) return false;
+ // N.B. This is benign as we will check again after taking the lock.
+ TSAN_ANNOTATE_BENIGN_RACE(&cap->running_task, "tryGrabCapability (cap->running_task)");
+ if (RELAXED_LOAD(&cap->running_task) != NULL) return false;
+
r = TRY_ACQUIRE_LOCK(&cap->lock);
if (r != 0) return false;
if (cap->running_task != NULL) {
@@ -1014,7 +1114,7 @@ tryGrabCapability (Capability *cap, Task *task)
return false;
}
task->cap = cap;
- cap->running_task = task;
+ RELAXED_STORE(&cap->running_task, task);
RELEASE_LOCK(&cap->lock);
return true;
}
@@ -1263,7 +1363,7 @@ void
setIOManagerControlFd(uint32_t cap_no USED_IF_THREADS, int fd USED_IF_THREADS) {
#if defined(THREADED_RTS)
if (cap_no < n_capabilities) {
- capabilities[cap_no]->io_manager_control_wr_fd = fd;
+ RELAXED_STORE(&capabilities[cap_no]->io_manager_control_wr_fd, fd);
} else {
errorBelch("warning: setIOManagerControlFd called with illegal capability number.");
}