summaryrefslogtreecommitdiff
path: root/rts
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2015-06-19 15:12:24 +0100
committerSimon Marlow <marlowsd@gmail.com>2015-06-26 09:32:23 +0100
commit111ba4beda4ffc48381723da12e5b237d7f9ac59 (patch)
tree84ec683a20ea340bd60877daa8ab962e5eebaa2e /rts
parentbe0ce8718ea40b091e69dd48fe6bc62b6b551154 (diff)
downloadhaskell-111ba4beda4ffc48381723da12e5b237d7f9ac59.tar.gz
Fix deadlock (#10545)
yieldCapability() was not prepared to be called by a Task that is not either a worker or a bound Task. This could happen if we ended up in yieldCapability via this call stack: performGC() scheduleDoGC() requestSync() yieldCapability() and there were a few other ways this could happen via requestSync. The fix is to handle this case in yieldCapability(): when the Task is not a worker or a bound Task, we put it on the returning_workers queue, where it will be woken up again. Summary of changes: * `yieldCapability`: factored out subroutine waitForWorkerCapability` * `waitForReturnCapability` renamed to `waitForCapability`, and factored out subroutine `waitForReturnCapability` * `releaseCapabilityAndQueue` worker renamed to `enqueueWorker`, does not take a lock and no longer tests if `!isBoundTask()` * `yieldCapability` adjusted for refactorings, only change in behavior is when it is not a worker or bound task. Test Plan: * new test concurrent/should_run/performGC * validate Reviewers: niteria, austin, ezyang, bgamari Subscribers: thomie, bgamari Differential Revision: https://phabricator.haskell.org/D997 GHC Trac Issues: #10545
Diffstat (limited to 'rts')
-rw-r--r--rts/Capability.c301
-rw-r--r--rts/Capability.h8
-rw-r--r--rts/RtsAPI.c2
-rw-r--r--rts/Schedule.c14
-rw-r--r--rts/Task.h11
5 files changed, 206 insertions, 130 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 21f63f39d9..b0b7f307b5 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -43,7 +43,7 @@ nat enabled_capabilities = 0;
// The array of Capabilities. It's important that when we need
// to allocate more Capabilities we don't have to move the existing
// Capabilities, because there may be pointers to them in use
-// (e.g. threads in waitForReturnCapability(), see #8209), so this is
+// (e.g. threads in waitForCapability(), see #8209), so this is
// an array of Capability* rather than an array of Capability.
Capability **capabilities = NULL;
@@ -450,11 +450,10 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
#endif
/* ----------------------------------------------------------------------------
- * Function: releaseCapability(Capability*)
+ * releaseCapability
*
- * Purpose: Letting go of a capability. Causes a
- * 'returning worker' thread or a 'waiting worker'
- * to wake up, in that order.
+ * The current Task (cap->task) releases the Capability. The Capability is
+ * marked free, and if there is any work to do, an appropriate Task is woken up.
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
@@ -474,13 +473,13 @@ releaseCapability_ (Capability* cap,
// the go-ahead to return the result of an external call..
if (cap->returning_tasks_hd != NULL) {
giveCapabilityToTask(cap,cap->returning_tasks_hd);
- // The Task pops itself from the queue (see waitForReturnCapability())
+ // The Task pops itself from the queue (see waitForCapability())
return;
}
// If there is a pending sync, then we should just leave the
// Capability free. The thread trying to sync will be about to
- // call waitForReturnCapability().
+ // call waitForCapability().
if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) {
last_free_capability = cap; // needed?
debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
@@ -549,62 +548,156 @@ releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
}
static void
-releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
+enqueueWorker (Capability* cap USED_IF_THREADS)
{
Task *task;
- ACQUIRE_LOCK(&cap->lock);
-
task = cap->running_task;
// If the Task is stopped, we shouldn't be yielding, we should
// be just exiting.
ASSERT(!task->stopped);
+ ASSERT(task->worker);
- // If the current task is a worker, save it on the spare_workers
- // list of this Capability. A worker can mark itself as stopped,
- // in which case it is not replaced on the spare_worker queue.
- // This happens when the system is shutting down (see
- // Schedule.c:workerStart()).
- if (!isBoundTask(task))
+ if (cap->n_spare_workers < MAX_SPARE_WORKERS)
+ {
+ task->next = cap->spare_workers;
+ cap->spare_workers = task;
+ cap->n_spare_workers++;
+ }
+ else
{
- if (cap->n_spare_workers < MAX_SPARE_WORKERS)
- {
- task->next = cap->spare_workers;
- cap->spare_workers = task;
- cap->n_spare_workers++;
+ debugTrace(DEBUG_sched, "%d spare workers already, exiting",
+ cap->n_spare_workers);
+ releaseCapability_(cap,rtsFalse);
+ // hold the lock until after workerTaskStop; c.f. scheduleWorker()
+ workerTaskStop(task);
+ RELEASE_LOCK(&cap->lock);
+ shutdownThread();
+ }
+}
+
+#endif
+
+/* ----------------------------------------------------------------------------
+ * waitForWorkerCapability(task)
+ *
+ * waits to be given a Capability, and then returns the Capability. The task
+ * must be either a worker (and on a cap->spare_workers queue), or a bound Task.
+ * ------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+
+static Capability * waitForWorkerCapability (Task *task)
+{
+ Capability *cap;
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
+
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task != NULL) {
+ debugTrace(DEBUG_sched,
+ "capability %d is owned by another task", cap->no);
+ RELEASE_LOCK(&cap->lock);
+ continue;
}
- else
- {
- debugTrace(DEBUG_sched, "%d spare workers already, exiting",
- cap->n_spare_workers);
- releaseCapability_(cap,rtsFalse);
- // hold the lock until after workerTaskStop; c.f. scheduleWorker()
- workerTaskStop(task);
+
+ if (task->cap != cap) {
+ // see Note [migrated bound threads]
+ debugTrace(DEBUG_sched,
+ "task has been migrated to cap %d", task->cap->no);
RELEASE_LOCK(&cap->lock);
- shutdownThread();
+ continue;
+ }
+
+ if (task->incall->tso == NULL) {
+ ASSERT(cap->spare_workers != NULL);
+ // if we're not at the front of the queue, release it
+ // again. This is unlikely to happen.
+ if (cap->spare_workers != task) {
+ giveCapabilityToTask(cap,cap->spare_workers);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->spare_workers = task->next;
+ task->next = NULL;
+ cap->n_spare_workers--;
}
+
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
+ break;
}
- // Bound tasks just float around attached to their TSOs.
- releaseCapability_(cap,rtsFalse);
+ return cap;
+}
- RELEASE_LOCK(&cap->lock);
+#endif /* THREADED_RTS */
+
+/* ----------------------------------------------------------------------------
+ * waitForReturnCapability (Task *task)
+ *
+ * The Task should be on the cap->returning_tasks queue of a Capability. This
+ * function waits for the Task to be woken up, and returns the Capability that
+ * it was woken up on.
+ *
+ * ------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+
+static Capability * waitForReturnCapability (Task *task)
+{
+ Capability *cap;
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ // now check whether we should wake up...
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task == NULL) {
+ if (cap->returning_tasks_hd != task) {
+ giveCapabilityToTask(cap,cap->returning_tasks_hd);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->running_task = task;
+ popReturningTask(cap);
+ RELEASE_LOCK(&cap->lock);
+ break;
+ }
+ RELEASE_LOCK(&cap->lock);
+ }
+
+ return cap;
}
-#endif
+
+#endif /* THREADED_RTS */
/* ----------------------------------------------------------------------------
- * waitForReturnCapability (Capability **pCap, Task *task)
+ * waitForCapability (Capability **pCap, Task *task)
*
* Purpose: when an OS thread returns from an external call,
- * it calls waitForReturnCapability() (via Schedule.resumeThread())
+ * it calls waitForCapability() (via Schedule.resumeThread())
* to wait for permission to enter the RTS & communicate the
* result of the external call back to the Haskell thread that
* made it.
*
* ------------------------------------------------------------------------- */
-void
-waitForReturnCapability (Capability **pCap, Task *task)
+
+void waitForCapability (Capability **pCap, Task *task)
{
#if !defined(THREADED_RTS)
@@ -641,10 +734,9 @@ waitForReturnCapability (Capability **pCap, Task *task)
ASSERT(task->cap == cap);
}
- ACQUIRE_LOCK(&cap->lock);
-
debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
+ ACQUIRE_LOCK(&cap->lock);
if (!cap->running_task) {
// It's free; just grab it
cap->running_task = task;
@@ -652,31 +744,7 @@ waitForReturnCapability (Capability **pCap, Task *task)
} else {
newReturningTask(cap,task);
RELEASE_LOCK(&cap->lock);
-
- for (;;) {
- ACQUIRE_LOCK(&task->lock);
- // task->lock held, cap->lock not held
- if (!task->wakeup) waitCondition(&task->cond, &task->lock);
- cap = task->cap;
- task->wakeup = rtsFalse;
- RELEASE_LOCK(&task->lock);
-
- // now check whether we should wake up...
- ACQUIRE_LOCK(&cap->lock);
- if (cap->running_task == NULL) {
- if (cap->returning_tasks_hd != task) {
- giveCapabilityToTask(cap,cap->returning_tasks_hd);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
- cap->running_task = task;
- popReturningTask(cap);
- RELEASE_LOCK(&cap->lock);
- break;
- }
- RELEASE_LOCK(&cap->lock);
- }
-
+ cap = waitForReturnCapability(task);
}
#ifdef PROFILING
@@ -691,11 +759,30 @@ waitForReturnCapability (Capability **pCap, Task *task)
#endif
}
-#if defined(THREADED_RTS)
/* ----------------------------------------------------------------------------
* yieldCapability
+ *
+ * Give up the Capability, and return when we have it again. This is called
+ * when either we know that the Capability should be given to another Task, or
+ * there is nothing to do right now. One of the following is true:
+ *
+ * - The current Task is a worker, and there's a bound thread at the head of
+ * the run queue (or vice versa)
+ *
+ * - The run queue is empty. We'll be woken up again when there's work to
+ * do.
+ *
+ * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
+ * We should become a GC worker for a while.
+ *
+ * - Another Task is trying to acquire all the Capabilities (pending_sync !=
+ * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
+ * setNumCapabilities. We should give up the Capability temporarily.
+ *
* ------------------------------------------------------------------------- */
+#if defined (THREADED_RTS)
+
/* See Note [GC livelock] in Schedule.c for why we have gcAllowed
and return the rtsBool */
rtsBool /* Did we GC? */
@@ -714,63 +801,39 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
}
}
- debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
+ debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
- // We must now release the capability and wait to be woken up
- // again.
- task->wakeup = rtsFalse;
- releaseCapabilityAndQueueWorker(cap);
-
- for (;;) {
- ACQUIRE_LOCK(&task->lock);
- // task->lock held, cap->lock not held
- if (!task->wakeup) waitCondition(&task->cond, &task->lock);
- cap = task->cap;
- task->wakeup = rtsFalse;
- RELEASE_LOCK(&task->lock);
-
- debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
-
- ACQUIRE_LOCK(&cap->lock);
- if (cap->running_task != NULL) {
- debugTrace(DEBUG_sched,
- "capability %d is owned by another task", cap->no);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
+ // We must now release the capability and wait to be woken up again.
+ task->wakeup = rtsFalse;
- if (task->cap != cap) {
- // see Note [migrated bound threads]
- debugTrace(DEBUG_sched,
- "task has been migrated to cap %d", task->cap->no);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
+ ACQUIRE_LOCK(&cap->lock);
- if (task->incall->tso == NULL) {
- ASSERT(cap->spare_workers != NULL);
- // if we're not at the front of the queue, release it
- // again. This is unlikely to happen.
- if (cap->spare_workers != task) {
- giveCapabilityToTask(cap,cap->spare_workers);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
- cap->spare_workers = task->next;
- task->next = NULL;
- cap->n_spare_workers--;
- }
+ // If this is a worker thread, put it on the spare_workers queue
+ if (isWorker(task)) {
+ enqueueWorker(cap);
+ }
- cap->running_task = task;
- RELEASE_LOCK(&cap->lock);
- break;
- }
+ releaseCapability_(cap, rtsFalse);
- debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
- ASSERT(cap->running_task == task);
+ if (isWorker(task) || isBoundTask(task)) {
+ RELEASE_LOCK(&cap->lock);
+ cap = waitForWorkerCapability(task);
+ } else {
+ // Not a worker Task, or a bound Task. The only way we can be woken up
+ // again is to put ourselves on the returning_tasks queue, so that's
+ // what we do. We still hold cap->lock at this point
+ // The Task waiting for this Capability does not have it
+ // yet, so we can be sure to be woken up later. (see #10545)
+ newReturningTask(cap,task);
+ RELEASE_LOCK(&cap->lock);
+ cap = waitForReturnCapability(task);
+ }
+
+ debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
+ ASSERT(cap->running_task == task);
#ifdef PROFILING
- cap->r.rCCCS = CCS_SYSTEM;
+ cap->r.rCCCS = CCS_SYSTEM;
#endif
*pCap = cap;
@@ -780,6 +843,8 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
return rtsFalse;
}
+#endif /* THREADED_RTS */
+
// Note [migrated bound threads]
//
// There's a tricky case where:
@@ -815,6 +880,8 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
* get every Capability into the GC.
* ------------------------------------------------------------------------- */
+#if defined (THREADED_RTS)
+
void
prodCapability (Capability *cap, Task *task)
{
@@ -826,6 +893,8 @@ prodCapability (Capability *cap, Task *task)
RELEASE_LOCK(&cap->lock);
}
+#endif /* THREADED_RTS */
+
/* ----------------------------------------------------------------------------
* tryGrabCapability
*
@@ -833,6 +902,8 @@ prodCapability (Capability *cap, Task *task)
*
* ------------------------------------------------------------------------- */
+#if defined (THREADED_RTS)
+
rtsBool
tryGrabCapability (Capability *cap, Task *task)
{
diff --git a/rts/Capability.h b/rts/Capability.h
index 420bfd5c80..fb9f0aa181 100644
--- a/rts/Capability.h
+++ b/rts/Capability.h
@@ -248,7 +248,7 @@ extern volatile StgWord pending_sync;
//
// On return, *cap is non-NULL, and points to the Capability acquired.
//
-void waitForReturnCapability (Capability **cap/*in/out*/, Task *task);
+void waitForCapability (Capability **cap/*in/out*/, Task *task);
EXTERN_INLINE void recordMutableCap (StgClosure *p, Capability *cap, nat gen);
@@ -269,12 +269,6 @@ EXTERN_INLINE void recordClosureMutated (Capability *cap, StgClosure *p);
//
rtsBool yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed);
-// Acquires a capability for doing some work.
-//
-// On return: pCap points to the capability.
-//
-void waitForCapability (Task *task, Mutex *mutex, Capability **pCap);
-
// Wakes up a worker thread on just one Capability, used when we
// need to service some global event.
//
diff --git a/rts/RtsAPI.c b/rts/RtsAPI.c
index fb91fafdd3..2b3ad74a17 100644
--- a/rts/RtsAPI.c
+++ b/rts/RtsAPI.c
@@ -564,7 +564,7 @@ rts_lock (void)
}
cap = NULL;
- waitForReturnCapability(&cap, task);
+ waitForCapability(&cap, task);
if (task->incall->prev_stack == NULL) {
// This is a new outermost call from C into Haskell land.
diff --git a/rts/Schedule.c b/rts/Schedule.c
index f81fc0e703..6edb7d063e 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -1424,7 +1424,7 @@ static void acquireAllCapabilities(Capability *cap, Task *task)
// all the Capabilities, but even so it's a slightly
// unsavoury invariant.
task->cap = tmpcap;
- waitForReturnCapability(&tmpcap, task);
+ waitForCapability(&tmpcap, task);
if (tmpcap->no != i) {
barf("acquireAllCapabilities: got the wrong capability");
}
@@ -1801,7 +1801,7 @@ forkProcess(HsStablePtr *entry
task = newBoundTask();
cap = NULL;
- waitForReturnCapability(&cap, task);
+ waitForCapability(&cap, task);
#ifdef THREADED_RTS
do {
@@ -2278,7 +2278,7 @@ resumeThread (void *task_)
task->cap = cap;
// Wait for permission to re-enter the RTS with the result.
- waitForReturnCapability(&cap,task);
+ waitForCapability(&cap,task);
// we might be on a different capability now... but if so, our
// entry on the suspended_ccalls list will also have been
// migrated.
@@ -2408,7 +2408,7 @@ void scheduleWorker (Capability *cap, Task *task)
// cap->lock until we've finished workerTaskStop() below.
//
// There may be workers still involved in foreign calls; those
- // will just block in waitForReturnCapability() because the
+ // will just block in waitForCapability() because the
// Capability has been shut down.
//
ACQUIRE_LOCK(&cap->lock);
@@ -2499,7 +2499,7 @@ exitScheduler (rtsBool wait_foreign USED_IF_THREADS)
if (sched_state < SCHED_SHUTTING_DOWN) {
sched_state = SCHED_INTERRUPTING;
Capability *cap = task->cap;
- waitForReturnCapability(&cap,task);
+ waitForCapability(&cap,task);
scheduleDoGC(&cap,task,rtsTrue);
ASSERT(task->incall->tso == NULL);
releaseCapability(cap);
@@ -2523,7 +2523,7 @@ freeScheduler( void )
still_running = freeTaskManager();
// We can only free the Capabilities if there are no Tasks still
// running. We might have a Task about to return from a foreign
- // call into waitForReturnCapability(), for example (actually,
+ // call into waitForCapability(), for example (actually,
// this should be the *only* thing that a still-running Task can
// do at this point, and it will block waiting for the
// Capability).
@@ -2567,7 +2567,7 @@ performGC_(rtsBool force_major)
// TODO: do we need to traceTask*() here?
- waitForReturnCapability(&cap,task);
+ waitForCapability(&cap,task);
scheduleDoGC(&cap,task,force_major);
releaseCapability(cap);
boundTaskExiting(task);
diff --git a/rts/Task.h b/rts/Task.h
index 5c7b049631..58798bd24d 100644
--- a/rts/Task.h
+++ b/rts/Task.h
@@ -167,6 +167,17 @@ isBoundTask (Task *task)
return (task->incall->tso != NULL);
}
+// A Task is currently a worker if
+// (a) it was created as a worker (task->worker), and
+// (b) it has not left and re-entered Haskell, in which case
+// task->incall->prev_stack would be non-NULL.
+//
+INLINE_HEADER rtsBool
+isWorker (Task *task)
+{
+ return (task->worker && task->incall->prev_stack == NULL);
+}
+
// Linked list of all tasks.
//
extern Task *all_tasks;