summaryrefslogtreecommitdiff
path: root/rts/Capability.c
diff options
context:
space:
mode:
Diffstat (limited to 'rts/Capability.c')
-rw-r--r--rts/Capability.c301
1 files changed, 186 insertions, 115 deletions
diff --git a/rts/Capability.c b/rts/Capability.c
index 21f63f39d9..b0b7f307b5 100644
--- a/rts/Capability.c
+++ b/rts/Capability.c
@@ -43,7 +43,7 @@ nat enabled_capabilities = 0;
// The array of Capabilities. It's important that when we need
// to allocate more Capabilities we don't have to move the existing
// Capabilities, because there may be pointers to them in use
-// (e.g. threads in waitForReturnCapability(), see #8209), so this is
+// (e.g. threads in waitForCapability(), see #8209), so this is
// an array of Capability* rather than an array of Capability.
Capability **capabilities = NULL;
@@ -450,11 +450,10 @@ giveCapabilityToTask (Capability *cap USED_IF_DEBUG, Task *task)
#endif
/* ----------------------------------------------------------------------------
- * Function: releaseCapability(Capability*)
+ * releaseCapability
*
- * Purpose: Letting go of a capability. Causes a
- * 'returning worker' thread or a 'waiting worker'
- * to wake up, in that order.
+ * The current Task (cap->task) releases the Capability. The Capability is
+ * marked free, and if there is any work to do, an appropriate Task is woken up.
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
@@ -474,13 +473,13 @@ releaseCapability_ (Capability* cap,
// the go-ahead to return the result of an external call..
if (cap->returning_tasks_hd != NULL) {
giveCapabilityToTask(cap,cap->returning_tasks_hd);
- // The Task pops itself from the queue (see waitForReturnCapability())
+ // The Task pops itself from the queue (see waitForCapability())
return;
}
// If there is a pending sync, then we should just leave the
// Capability free. The thread trying to sync will be about to
- // call waitForReturnCapability().
+ // call waitForCapability().
if (pending_sync != 0 && pending_sync != SYNC_GC_PAR) {
last_free_capability = cap; // needed?
debugTrace(DEBUG_sched, "sync pending, set capability %d free", cap->no);
@@ -549,62 +548,156 @@ releaseAndWakeupCapability (Capability* cap USED_IF_THREADS)
}
static void
-releaseCapabilityAndQueueWorker (Capability* cap USED_IF_THREADS)
+enqueueWorker (Capability* cap USED_IF_THREADS)
{
Task *task;
- ACQUIRE_LOCK(&cap->lock);
-
task = cap->running_task;
// If the Task is stopped, we shouldn't be yielding, we should
// be just exiting.
ASSERT(!task->stopped);
+ ASSERT(task->worker);
- // If the current task is a worker, save it on the spare_workers
- // list of this Capability. A worker can mark itself as stopped,
- // in which case it is not replaced on the spare_worker queue.
- // This happens when the system is shutting down (see
- // Schedule.c:workerStart()).
- if (!isBoundTask(task))
+ if (cap->n_spare_workers < MAX_SPARE_WORKERS)
+ {
+ task->next = cap->spare_workers;
+ cap->spare_workers = task;
+ cap->n_spare_workers++;
+ }
+ else
{
- if (cap->n_spare_workers < MAX_SPARE_WORKERS)
- {
- task->next = cap->spare_workers;
- cap->spare_workers = task;
- cap->n_spare_workers++;
+ debugTrace(DEBUG_sched, "%d spare workers already, exiting",
+ cap->n_spare_workers);
+ releaseCapability_(cap,rtsFalse);
+ // hold the lock until after workerTaskStop; c.f. scheduleWorker()
+ workerTaskStop(task);
+ RELEASE_LOCK(&cap->lock);
+ shutdownThread();
+ }
+}
+
+#endif
+
+/* ----------------------------------------------------------------------------
+ * waitForWorkerCapability(task)
+ *
+ * waits to be given a Capability, and then returns the Capability. The task
+ * must be either a worker (and on a cap->spare_workers queue), or a bound Task.
+ * ------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+
+static Capability * waitForWorkerCapability (Task *task)
+{
+ Capability *cap;
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
+
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task != NULL) {
+ debugTrace(DEBUG_sched,
+ "capability %d is owned by another task", cap->no);
+ RELEASE_LOCK(&cap->lock);
+ continue;
}
- else
- {
- debugTrace(DEBUG_sched, "%d spare workers already, exiting",
- cap->n_spare_workers);
- releaseCapability_(cap,rtsFalse);
- // hold the lock until after workerTaskStop; c.f. scheduleWorker()
- workerTaskStop(task);
+
+ if (task->cap != cap) {
+ // see Note [migrated bound threads]
+ debugTrace(DEBUG_sched,
+ "task has been migrated to cap %d", task->cap->no);
RELEASE_LOCK(&cap->lock);
- shutdownThread();
+ continue;
+ }
+
+ if (task->incall->tso == NULL) {
+ ASSERT(cap->spare_workers != NULL);
+ // if we're not at the front of the queue, release it
+ // again. This is unlikely to happen.
+ if (cap->spare_workers != task) {
+ giveCapabilityToTask(cap,cap->spare_workers);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->spare_workers = task->next;
+ task->next = NULL;
+ cap->n_spare_workers--;
}
+
+ cap->running_task = task;
+ RELEASE_LOCK(&cap->lock);
+ break;
}
- // Bound tasks just float around attached to their TSOs.
- releaseCapability_(cap,rtsFalse);
+ return cap;
+}
- RELEASE_LOCK(&cap->lock);
+#endif /* THREADED_RTS */
+
+/* ----------------------------------------------------------------------------
+ * waitForReturnCapability (Task *task)
+ *
+ * The Task should be on the cap->returning_tasks queue of a Capability. This
+ * function waits for the Task to be woken up, and returns the Capability that
+ * it was woken up on.
+ *
+ * ------------------------------------------------------------------------- */
+
+#if defined(THREADED_RTS)
+
+static Capability * waitForReturnCapability (Task *task)
+{
+ Capability *cap;
+
+ for (;;) {
+ ACQUIRE_LOCK(&task->lock);
+ // task->lock held, cap->lock not held
+ if (!task->wakeup) waitCondition(&task->cond, &task->lock);
+ cap = task->cap;
+ task->wakeup = rtsFalse;
+ RELEASE_LOCK(&task->lock);
+
+ // now check whether we should wake up...
+ ACQUIRE_LOCK(&cap->lock);
+ if (cap->running_task == NULL) {
+ if (cap->returning_tasks_hd != task) {
+ giveCapabilityToTask(cap,cap->returning_tasks_hd);
+ RELEASE_LOCK(&cap->lock);
+ continue;
+ }
+ cap->running_task = task;
+ popReturningTask(cap);
+ RELEASE_LOCK(&cap->lock);
+ break;
+ }
+ RELEASE_LOCK(&cap->lock);
+ }
+
+ return cap;
}
-#endif
+
+#endif /* THREADED_RTS */
/* ----------------------------------------------------------------------------
- * waitForReturnCapability (Capability **pCap, Task *task)
+ * waitForCapability (Capability **pCap, Task *task)
*
* Purpose: when an OS thread returns from an external call,
- * it calls waitForReturnCapability() (via Schedule.resumeThread())
+ * it calls waitForCapability() (via Schedule.resumeThread())
* to wait for permission to enter the RTS & communicate the
* result of the external call back to the Haskell thread that
* made it.
*
* ------------------------------------------------------------------------- */
-void
-waitForReturnCapability (Capability **pCap, Task *task)
+
+void waitForCapability (Capability **pCap, Task *task)
{
#if !defined(THREADED_RTS)
@@ -641,10 +734,9 @@ waitForReturnCapability (Capability **pCap, Task *task)
ASSERT(task->cap == cap);
}
- ACQUIRE_LOCK(&cap->lock);
-
debugTrace(DEBUG_sched, "returning; I want capability %d", cap->no);
+ ACQUIRE_LOCK(&cap->lock);
if (!cap->running_task) {
// It's free; just grab it
cap->running_task = task;
@@ -652,31 +744,7 @@ waitForReturnCapability (Capability **pCap, Task *task)
} else {
newReturningTask(cap,task);
RELEASE_LOCK(&cap->lock);
-
- for (;;) {
- ACQUIRE_LOCK(&task->lock);
- // task->lock held, cap->lock not held
- if (!task->wakeup) waitCondition(&task->cond, &task->lock);
- cap = task->cap;
- task->wakeup = rtsFalse;
- RELEASE_LOCK(&task->lock);
-
- // now check whether we should wake up...
- ACQUIRE_LOCK(&cap->lock);
- if (cap->running_task == NULL) {
- if (cap->returning_tasks_hd != task) {
- giveCapabilityToTask(cap,cap->returning_tasks_hd);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
- cap->running_task = task;
- popReturningTask(cap);
- RELEASE_LOCK(&cap->lock);
- break;
- }
- RELEASE_LOCK(&cap->lock);
- }
-
+ cap = waitForReturnCapability(task);
}
#ifdef PROFILING
@@ -691,11 +759,30 @@ waitForReturnCapability (Capability **pCap, Task *task)
#endif
}
-#if defined(THREADED_RTS)
/* ----------------------------------------------------------------------------
* yieldCapability
+ *
+ * Give up the Capability, and return when we have it again. This is called
+ * when either we know that the Capability should be given to another Task, or
+ * there is nothing to do right now. One of the following is true:
+ *
+ * - The current Task is a worker, and there's a bound thread at the head of
+ * the run queue (or vice versa)
+ *
+ * - The run queue is empty. We'll be woken up again when there's work to
+ * do.
+ *
+ * - Another Task is trying to do parallel GC (pending_sync == SYNC_GC_PAR).
+ * We should become a GC worker for a while.
+ *
+ * - Another Task is trying to acquire all the Capabilities (pending_sync !=
+ * SYNC_GC_PAR), either to do a sequential GC, forkProcess, or
+ * setNumCapabilities. We should give up the Capability temporarily.
+ *
* ------------------------------------------------------------------------- */
+#if defined (THREADED_RTS)
+
/* See Note [GC livelock] in Schedule.c for why we have gcAllowed
and return the rtsBool */
rtsBool /* Did we GC? */
@@ -714,63 +801,39 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
}
}
- debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
+ debugTrace(DEBUG_sched, "giving up capability %d", cap->no);
- // We must now release the capability and wait to be woken up
- // again.
- task->wakeup = rtsFalse;
- releaseCapabilityAndQueueWorker(cap);
-
- for (;;) {
- ACQUIRE_LOCK(&task->lock);
- // task->lock held, cap->lock not held
- if (!task->wakeup) waitCondition(&task->cond, &task->lock);
- cap = task->cap;
- task->wakeup = rtsFalse;
- RELEASE_LOCK(&task->lock);
-
- debugTrace(DEBUG_sched, "woken up on capability %d", cap->no);
-
- ACQUIRE_LOCK(&cap->lock);
- if (cap->running_task != NULL) {
- debugTrace(DEBUG_sched,
- "capability %d is owned by another task", cap->no);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
+ // We must now release the capability and wait to be woken up again.
+ task->wakeup = rtsFalse;
- if (task->cap != cap) {
- // see Note [migrated bound threads]
- debugTrace(DEBUG_sched,
- "task has been migrated to cap %d", task->cap->no);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
+ ACQUIRE_LOCK(&cap->lock);
- if (task->incall->tso == NULL) {
- ASSERT(cap->spare_workers != NULL);
- // if we're not at the front of the queue, release it
- // again. This is unlikely to happen.
- if (cap->spare_workers != task) {
- giveCapabilityToTask(cap,cap->spare_workers);
- RELEASE_LOCK(&cap->lock);
- continue;
- }
- cap->spare_workers = task->next;
- task->next = NULL;
- cap->n_spare_workers--;
- }
+ // If this is a worker thread, put it on the spare_workers queue
+ if (isWorker(task)) {
+ enqueueWorker(cap);
+ }
- cap->running_task = task;
- RELEASE_LOCK(&cap->lock);
- break;
- }
+ releaseCapability_(cap, rtsFalse);
- debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
- ASSERT(cap->running_task == task);
+ if (isWorker(task) || isBoundTask(task)) {
+ RELEASE_LOCK(&cap->lock);
+ cap = waitForWorkerCapability(task);
+ } else {
+ // Not a worker Task, or a bound Task. The only way we can be woken up
+ // again is to put ourselves on the returning_tasks queue, so that's
+ // what we do. We still hold cap->lock at this point
+ // The Task waiting for this Capability does not have it
+ // yet, so we can be sure to be woken up later. (see #10545)
+ newReturningTask(cap,task);
+ RELEASE_LOCK(&cap->lock);
+ cap = waitForReturnCapability(task);
+ }
+
+ debugTrace(DEBUG_sched, "resuming capability %d", cap->no);
+ ASSERT(cap->running_task == task);
#ifdef PROFILING
- cap->r.rCCCS = CCS_SYSTEM;
+ cap->r.rCCCS = CCS_SYSTEM;
#endif
*pCap = cap;
@@ -780,6 +843,8 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
return rtsFalse;
}
+#endif /* THREADED_RTS */
+
// Note [migrated bound threads]
//
// There's a tricky case where:
@@ -815,6 +880,8 @@ yieldCapability (Capability** pCap, Task *task, rtsBool gcAllowed)
* get every Capability into the GC.
* ------------------------------------------------------------------------- */
+#if defined (THREADED_RTS)
+
void
prodCapability (Capability *cap, Task *task)
{
@@ -826,6 +893,8 @@ prodCapability (Capability *cap, Task *task)
RELEASE_LOCK(&cap->lock);
}
+#endif /* THREADED_RTS */
+
/* ----------------------------------------------------------------------------
* tryGrabCapability
*
@@ -833,6 +902,8 @@ prodCapability (Capability *cap, Task *task)
*
* ------------------------------------------------------------------------- */
+#if defined (THREADED_RTS)
+
rtsBool
tryGrabCapability (Capability *cap, Task *task)
{