summaryrefslogtreecommitdiff
path: root/rts/Schedule.c
diff options
context:
space:
mode:
authorSimon Marlow <marlowsd@gmail.com>2010-03-09 14:31:11 +0000
committerSimon Marlow <marlowsd@gmail.com>2010-03-09 14:31:11 +0000
commit7effbbbbdfe7eb05c6402fa9337e358e7e9fadde (patch)
treed5896a7858db38265b77fc799fc54d5151737aab /rts/Schedule.c
parent4e8b07dbc753a5132c574926468ba886728c9049 (diff)
downloadhaskell-7effbbbbdfe7eb05c6402fa9337e358e7e9fadde.tar.gz
Split part of the Task struct into a separate struct InCall
The idea is that this leaves Tasks and OSThread in one-to-one correspondence. The part of a Task that represents a call into Haskell from C is split into a separate struct InCall, pointed to by the Task and the TSO bound to it. A given OSThread/Task thus always uses the same mutex and condition variable, rather than getting a new one for each callback. Conceptually it is simpler, although there are more types and indirections in a few places now. This improves callback performance by removing some of the locks that we had to take when making in-calls. Now we also keep the current Task in a thread-local variable if supported by the OS and gcc (currently only Linux).
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r--rts/Schedule.c144
1 files changed, 65 insertions, 79 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c
index 66af8be7cb..4cca469869 100644
--- a/rts/Schedule.c
+++ b/rts/Schedule.c
@@ -312,7 +312,7 @@ schedule (Capability *initialCapability, Task *task)
// If we are a worker, just exit. If we're a bound thread
// then we will exit below when we've removed our TSO from
// the run queue.
- if (task->tso == NULL && emptyRunQueue(cap)) {
+ if (!isBoundTask(task) && emptyRunQueue(cap)) {
return cap;
}
break;
@@ -378,10 +378,10 @@ schedule (Capability *initialCapability, Task *task)
// Check whether we can run this thread in the current task.
// If not, we have to pass our capability to the right task.
{
- Task *bound = t->bound;
+ InCall *bound = t->bound;
if (bound) {
- if (bound == task) {
+ if (bound->task == task) {
// yes, the Haskell thread is bound to the current native thread
} else {
debugTrace(DEBUG_sched,
@@ -393,7 +393,7 @@ schedule (Capability *initialCapability, Task *task)
}
} else {
// The thread we want to run is unbound.
- if (task->tso) {
+ if (task->incall->tso) {
debugTrace(DEBUG_sched,
"this OS thread cannot run thread %lu",
(unsigned long)t->id);
@@ -441,7 +441,7 @@ run_thread:
ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task);
ASSERT(t->cap == cap);
- ASSERT(t->bound ? t->bound->cap == cap : 1);
+ ASSERT(t->bound ? t->bound->task->cap == cap : 1);
prev_what_next = t->what_next;
@@ -639,9 +639,9 @@ shouldYieldCapability (Capability *cap, Task *task)
// and this task it bound).
return (waiting_for_gc ||
cap->returning_tasks_hd != NULL ||
- (!emptyRunQueue(cap) && (task->tso == NULL
+ (!emptyRunQueue(cap) && (task->incall->tso == NULL
? cap->run_queue_hd->bound != NULL
- : cap->run_queue_hd->bound != task)));
+ : cap->run_queue_hd->bound != task->incall)));
}
// This is the single place where a Task goes to sleep. There are
@@ -768,7 +768,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
next = t->_link;
t->_link = END_TSO_QUEUE;
if (t->what_next == ThreadRelocated
- || t->bound == task // don't move my bound thread
+ || t->bound == task->incall // don't move my bound thread
|| tsoLocked(t)) { // don't move a locked thread
setTSOLink(cap, prev, t);
prev = t;
@@ -781,9 +781,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS,
} else {
appendToRunQueue(free_caps[i],t);
- traceEventMigrateThread (cap, t, free_caps[i]->no);
+ traceEventMigrateThread (cap, t, free_caps[i]->no);
- if (t->bound) { t->bound->cap = free_caps[i]; }
+ if (t->bound) { t->bound->task->cap = free_caps[i]; }
t->cap = free_caps[i];
i++;
}
@@ -979,13 +979,13 @@ scheduleDetectDeadlock (Capability *cap, Task *task)
/* Probably a real deadlock. Send the current main thread the
* Deadlock exception.
*/
- if (task->tso) {
- switch (task->tso->why_blocked) {
+ if (task->incall->tso) {
+ switch (task->incall->tso->why_blocked) {
case BlockedOnSTM:
case BlockedOnBlackHole:
case BlockedOnException:
case BlockedOnMVar:
- throwToSingleThreaded(cap, task->tso,
+ throwToSingleThreaded(cap, task->incall->tso,
(StgClosure *)nonTermination_closure);
return;
default:
@@ -1174,8 +1174,8 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t)
/* The TSO attached to this Task may have moved, so update the
* pointer to it.
*/
- if (task->tso == t) {
- task->tso = new_t;
+ if (task->incall->tso == t) {
+ task->incall->tso = new_t;
}
pushOnRunQueue(cap,new_t);
}
@@ -1285,7 +1285,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
if (t->bound) {
- if (t->bound != task) {
+ if (t->bound != task->incall) {
#if !defined(THREADED_RTS)
// Must be a bound thread that is not the topmost one. Leave
// it on the run queue until the stack has unwound to the
@@ -1302,12 +1302,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
#endif
}
- ASSERT(task->tso == t);
+ ASSERT(task->incall->tso == t);
if (t->what_next == ThreadComplete) {
if (task->ret) {
// NOTE: return val is tso->sp[1] (see StgStartup.hc)
- *(task->ret) = (StgClosure *)task->tso->sp[1];
+ *(task->ret) = (StgClosure *)task->incall->tso->sp[1];
}
task->stat = Success;
} else {
@@ -1325,7 +1325,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
}
}
#ifdef DEBUG
- removeThreadLabel((StgWord)task->tso->id);
+ removeThreadLabel((StgWord)task->incall->tso->id);
#endif
// We no longer consider this thread and task to be bound to
@@ -1336,7 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t)
// re-used). This was a real bug: the GC updated
// tso->bound->tso which lead to a deadlock.
t->bound = NULL;
- task->tso = NULL;
+ task->incall->tso = NULL;
return rtsTrue; // tells schedule() to return
}
@@ -1586,7 +1586,6 @@ forkProcess(HsStablePtr *entry
)
{
#ifdef FORKPROCESS_PRIMOP_SUPPORTED
- Task *task;
pid_t pid;
StgTSO* t,*next;
Capability *cap;
@@ -1661,7 +1660,7 @@ forkProcess(HsStablePtr *entry
// Any suspended C-calling Tasks are no more, their OS threads
// don't exist now:
- cap->suspended_ccalling_tasks = NULL;
+ cap->suspended_ccalls = NULL;
// Empty the threads lists. Otherwise, the garbage
// collector may attempt to resurrect some of these threads.
@@ -1669,17 +1668,7 @@ forkProcess(HsStablePtr *entry
generations[g].threads = END_TSO_QUEUE;
}
- // Wipe the task list, except the current Task.
- ACQUIRE_LOCK(&sched_mutex);
- for (task = all_tasks; task != NULL; task=task->all_link) {
- if (task != cap->running_task) {
-#if defined(THREADED_RTS)
- initMutex(&task->lock); // see #1391
-#endif
- discardTask(task);
- }
- }
- RELEASE_LOCK(&sched_mutex);
+ discardTasksExcept(cap->running_task);
#if defined(THREADED_RTS)
// Wipe our spare workers list, they no longer exist. New
@@ -1747,35 +1736,41 @@ deleteAllThreads ( Capability *cap )
}
/* -----------------------------------------------------------------------------
- Managing the suspended_ccalling_tasks list.
+ Managing the suspended_ccalls list.
Locks required: sched_mutex
-------------------------------------------------------------------------- */
STATIC_INLINE void
suspendTask (Capability *cap, Task *task)
{
- ASSERT(task->next == NULL && task->prev == NULL);
- task->next = cap->suspended_ccalling_tasks;
- task->prev = NULL;
- if (cap->suspended_ccalling_tasks) {
- cap->suspended_ccalling_tasks->prev = task;
- }
- cap->suspended_ccalling_tasks = task;
+ InCall *incall;
+
+ incall = task->incall;
+ ASSERT(incall->next == NULL && incall->prev == NULL);
+ incall->next = cap->suspended_ccalls;
+ incall->prev = NULL;
+ if (cap->suspended_ccalls) {
+ cap->suspended_ccalls->prev = incall;
+ }
+ cap->suspended_ccalls = incall;
}
STATIC_INLINE void
recoverSuspendedTask (Capability *cap, Task *task)
{
- if (task->prev) {
- task->prev->next = task->next;
+ InCall *incall;
+
+ incall = task->incall;
+ if (incall->prev) {
+ incall->prev->next = incall->next;
} else {
- ASSERT(cap->suspended_ccalling_tasks == task);
- cap->suspended_ccalling_tasks = task->next;
+ ASSERT(cap->suspended_ccalls == incall);
+ cap->suspended_ccalls = incall->next;
}
- if (task->next) {
- task->next->prev = task->prev;
+ if (incall->next) {
+ incall->next->prev = incall->prev;
}
- task->next = task->prev = NULL;
+ incall->next = incall->prev = NULL;
}
/* ---------------------------------------------------------------------------
@@ -1832,7 +1827,8 @@ suspendThread (StgRegTable *reg)
}
// Hand back capability
- task->suspended_tso = tso;
+ task->incall->suspended_tso = tso;
+ task->incall->suspended_cap = cap;
ACQUIRE_LOCK(&cap->lock);
@@ -1853,6 +1849,7 @@ StgRegTable *
resumeThread (void *task_)
{
StgTSO *tso;
+ InCall *incall;
Capability *cap;
Task *task = task_;
int saved_errno;
@@ -1865,18 +1862,22 @@ resumeThread (void *task_)
saved_winerror = GetLastError();
#endif
- cap = task->cap;
+ incall = task->incall;
+ cap = incall->suspended_cap;
+ task->cap = cap;
+
// Wait for permission to re-enter the RTS with the result.
waitForReturnCapability(&cap,task);
// we might be on a different capability now... but if so, our
- // entry on the suspended_ccalling_tasks list will also have been
+ // entry on the suspended_ccalls list will also have been
// migrated.
// Remove the thread from the suspended list
recoverSuspendedTask(cap,task);
- tso = task->suspended_tso;
- task->suspended_tso = NULL;
+ tso = incall->suspended_tso;
+ incall->suspended_tso = NULL;
+ incall->suspended_cap = NULL;
tso->_link = END_TSO_QUEUE; // no write barrier reqd
traceEventRunThread(cap, tso);
@@ -1954,10 +1955,10 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
// This TSO is now a bound thread; make the Task and TSO
// point to each other.
- tso->bound = task;
+ tso->bound = task->incall;
tso->cap = cap;
- task->tso = tso;
+ task->incall->tso = tso;
task->ret = ret;
task->stat = NoStatus;
@@ -1980,23 +1981,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap)
* ------------------------------------------------------------------------- */
#if defined(THREADED_RTS)
-void OSThreadProcAttr
-workerStart(Task *task)
+void scheduleWorker (Capability *cap, Task *task)
{
- Capability *cap;
-
- // See startWorkerTask().
- ACQUIRE_LOCK(&task->lock);
- cap = task->cap;
- RELEASE_LOCK(&task->lock);
-
- if (RtsFlags.ParFlags.setAffinity) {
- setThreadAffinity(cap->no, n_capabilities);
- }
-
- // set the thread-local pointer to the Task:
- taskEnter(task);
-
// schedule() runs without a lock.
cap = schedule(cap,task);
@@ -2062,6 +2048,8 @@ initScheduler(void)
initSparkPools();
#endif
+ RELEASE_LOCK(&sched_mutex);
+
#if defined(THREADED_RTS)
/*
* Eagerly start one worker to run each Capability, except for
@@ -2075,13 +2063,11 @@ initScheduler(void)
for (i = 1; i < n_capabilities; i++) {
cap = &capabilities[i];
ACQUIRE_LOCK(&cap->lock);
- startWorkerTask(cap, workerStart);
+ startWorkerTask(cap);
RELEASE_LOCK(&cap->lock);
}
}
#endif
-
- RELEASE_LOCK(&sched_mutex);
}
void
@@ -2102,7 +2088,7 @@ exitScheduler(
sched_state = SCHED_INTERRUPTING;
waitForReturnCapability(&task->cap,task);
scheduleDoGC(task->cap,task,rtsFalse);
- ASSERT(task->tso == NULL);
+ ASSERT(task->incall->tso == NULL);
releaseCapability(task->cap);
}
sched_state = SCHED_SHUTTING_DOWN;
@@ -2112,7 +2098,7 @@ exitScheduler(
nat i;
for (i = 0; i < n_capabilities; i++) {
- ASSERT(task->tso == NULL);
+ ASSERT(task->incall->tso == NULL);
shutdownCapability(&capabilities[i], task, wait_foreign);
}
}
@@ -2161,7 +2147,7 @@ performGC_(rtsBool force_major)
// We must grab a new Task here, because the existing Task may be
// associated with a particular Capability, and chained onto the
- // suspended_ccalling_tasks queue.
+ // suspended_ccalls queue.
task = newBoundTask();
waitForReturnCapability(&task->cap,task);
@@ -2368,8 +2354,8 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso)
// The TSO attached to this Task may have moved, so update the
// pointer to it.
- if (task->tso == tso) {
- task->tso = new_tso;
+ if (task->incall->tso == tso) {
+ task->incall->tso = new_tso;
}
unlockTSO(new_tso);