diff options
author | Simon Marlow <marlowsd@gmail.com> | 2010-03-09 14:31:11 +0000 |
---|---|---|
committer | Simon Marlow <marlowsd@gmail.com> | 2010-03-09 14:31:11 +0000 |
commit | 7effbbbbdfe7eb05c6402fa9337e358e7e9fadde (patch) | |
tree | d5896a7858db38265b77fc799fc54d5151737aab /rts/Schedule.c | |
parent | 4e8b07dbc753a5132c574926468ba886728c9049 (diff) | |
download | haskell-7effbbbbdfe7eb05c6402fa9337e358e7e9fadde.tar.gz |
Split part of the Task struct into a separate struct InCall
The idea is that this leaves Tasks and OSThread in one-to-one
correspondence. The part of a Task that represents a call into
Haskell from C is split into a separate struct InCall, pointed to by
the Task and the TSO bound to it. A given OSThread/Task thus always
uses the same mutex and condition variable, rather than getting a new
one for each callback. Conceptually it is simpler, although there are
more types and indirections in a few places now.
This improves callback performance by removing some of the locks that
we had to take when making in-calls. Now we also keep the current Task
in a thread-local variable if supported by the OS and gcc (currently
only Linux).
Diffstat (limited to 'rts/Schedule.c')
-rw-r--r-- | rts/Schedule.c | 144 |
1 files changed, 65 insertions, 79 deletions
diff --git a/rts/Schedule.c b/rts/Schedule.c index 66af8be7cb..4cca469869 100644 --- a/rts/Schedule.c +++ b/rts/Schedule.c @@ -312,7 +312,7 @@ schedule (Capability *initialCapability, Task *task) // If we are a worker, just exit. If we're a bound thread // then we will exit below when we've removed our TSO from // the run queue. - if (task->tso == NULL && emptyRunQueue(cap)) { + if (!isBoundTask(task) && emptyRunQueue(cap)) { return cap; } break; @@ -378,10 +378,10 @@ schedule (Capability *initialCapability, Task *task) // Check whether we can run this thread in the current task. // If not, we have to pass our capability to the right task. { - Task *bound = t->bound; + InCall *bound = t->bound; if (bound) { - if (bound == task) { + if (bound->task == task) { // yes, the Haskell thread is bound to the current native thread } else { debugTrace(DEBUG_sched, @@ -393,7 +393,7 @@ schedule (Capability *initialCapability, Task *task) } } else { // The thread we want to run is unbound. - if (task->tso) { + if (task->incall->tso) { debugTrace(DEBUG_sched, "this OS thread cannot run thread %lu", (unsigned long)t->id); @@ -441,7 +441,7 @@ run_thread: ASSERT_FULL_CAPABILITY_INVARIANTS(cap,task); ASSERT(t->cap == cap); - ASSERT(t->bound ? t->bound->cap == cap : 1); + ASSERT(t->bound ? t->bound->task->cap == cap : 1); prev_what_next = t->what_next; @@ -639,9 +639,9 @@ shouldYieldCapability (Capability *cap, Task *task) // and this task it bound). return (waiting_for_gc || cap->returning_tasks_hd != NULL || - (!emptyRunQueue(cap) && (task->tso == NULL + (!emptyRunQueue(cap) && (task->incall->tso == NULL ? cap->run_queue_hd->bound != NULL - : cap->run_queue_hd->bound != task))); + : cap->run_queue_hd->bound != task->incall))); } // This is the single place where a Task goes to sleep. There are @@ -768,7 +768,7 @@ schedulePushWork(Capability *cap USED_IF_THREADS, next = t->_link; t->_link = END_TSO_QUEUE; if (t->what_next == ThreadRelocated - || t->bound == task // don't move my bound thread + || t->bound == task->incall // don't move my bound thread || tsoLocked(t)) { // don't move a locked thread setTSOLink(cap, prev, t); prev = t; @@ -781,9 +781,9 @@ schedulePushWork(Capability *cap USED_IF_THREADS, } else { appendToRunQueue(free_caps[i],t); - traceEventMigrateThread (cap, t, free_caps[i]->no); + traceEventMigrateThread (cap, t, free_caps[i]->no); - if (t->bound) { t->bound->cap = free_caps[i]; } + if (t->bound) { t->bound->task->cap = free_caps[i]; } t->cap = free_caps[i]; i++; } @@ -979,13 +979,13 @@ scheduleDetectDeadlock (Capability *cap, Task *task) /* Probably a real deadlock. Send the current main thread the * Deadlock exception. */ - if (task->tso) { - switch (task->tso->why_blocked) { + if (task->incall->tso) { + switch (task->incall->tso->why_blocked) { case BlockedOnSTM: case BlockedOnBlackHole: case BlockedOnException: case BlockedOnMVar: - throwToSingleThreaded(cap, task->tso, + throwToSingleThreaded(cap, task->incall->tso, (StgClosure *)nonTermination_closure); return; default: @@ -1174,8 +1174,8 @@ scheduleHandleStackOverflow (Capability *cap, Task *task, StgTSO *t) /* The TSO attached to this Task may have moved, so update the * pointer to it. */ - if (task->tso == t) { - task->tso = new_t; + if (task->incall->tso == t) { + task->incall->tso = new_t; } pushOnRunQueue(cap,new_t); } @@ -1285,7 +1285,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) if (t->bound) { - if (t->bound != task) { + if (t->bound != task->incall) { #if !defined(THREADED_RTS) // Must be a bound thread that is not the topmost one. Leave // it on the run queue until the stack has unwound to the @@ -1302,12 +1302,12 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) #endif } - ASSERT(task->tso == t); + ASSERT(task->incall->tso == t); if (t->what_next == ThreadComplete) { if (task->ret) { // NOTE: return val is tso->sp[1] (see StgStartup.hc) - *(task->ret) = (StgClosure *)task->tso->sp[1]; + *(task->ret) = (StgClosure *)task->incall->tso->sp[1]; } task->stat = Success; } else { @@ -1325,7 +1325,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) } } #ifdef DEBUG - removeThreadLabel((StgWord)task->tso->id); + removeThreadLabel((StgWord)task->incall->tso->id); #endif // We no longer consider this thread and task to be bound to @@ -1336,7 +1336,7 @@ scheduleHandleThreadFinished (Capability *cap STG_UNUSED, Task *task, StgTSO *t) // re-used). This was a real bug: the GC updated // tso->bound->tso which lead to a deadlock. t->bound = NULL; - task->tso = NULL; + task->incall->tso = NULL; return rtsTrue; // tells schedule() to return } @@ -1586,7 +1586,6 @@ forkProcess(HsStablePtr *entry ) { #ifdef FORKPROCESS_PRIMOP_SUPPORTED - Task *task; pid_t pid; StgTSO* t,*next; Capability *cap; @@ -1661,7 +1660,7 @@ forkProcess(HsStablePtr *entry // Any suspended C-calling Tasks are no more, their OS threads // don't exist now: - cap->suspended_ccalling_tasks = NULL; + cap->suspended_ccalls = NULL; // Empty the threads lists. Otherwise, the garbage // collector may attempt to resurrect some of these threads. @@ -1669,17 +1668,7 @@ forkProcess(HsStablePtr *entry generations[g].threads = END_TSO_QUEUE; } - // Wipe the task list, except the current Task. - ACQUIRE_LOCK(&sched_mutex); - for (task = all_tasks; task != NULL; task=task->all_link) { - if (task != cap->running_task) { -#if defined(THREADED_RTS) - initMutex(&task->lock); // see #1391 -#endif - discardTask(task); - } - } - RELEASE_LOCK(&sched_mutex); + discardTasksExcept(cap->running_task); #if defined(THREADED_RTS) // Wipe our spare workers list, they no longer exist. New @@ -1747,35 +1736,41 @@ deleteAllThreads ( Capability *cap ) } /* ----------------------------------------------------------------------------- - Managing the suspended_ccalling_tasks list. + Managing the suspended_ccalls list. Locks required: sched_mutex -------------------------------------------------------------------------- */ STATIC_INLINE void suspendTask (Capability *cap, Task *task) { - ASSERT(task->next == NULL && task->prev == NULL); - task->next = cap->suspended_ccalling_tasks; - task->prev = NULL; - if (cap->suspended_ccalling_tasks) { - cap->suspended_ccalling_tasks->prev = task; - } - cap->suspended_ccalling_tasks = task; + InCall *incall; + + incall = task->incall; + ASSERT(incall->next == NULL && incall->prev == NULL); + incall->next = cap->suspended_ccalls; + incall->prev = NULL; + if (cap->suspended_ccalls) { + cap->suspended_ccalls->prev = incall; + } + cap->suspended_ccalls = incall; } STATIC_INLINE void recoverSuspendedTask (Capability *cap, Task *task) { - if (task->prev) { - task->prev->next = task->next; + InCall *incall; + + incall = task->incall; + if (incall->prev) { + incall->prev->next = incall->next; } else { - ASSERT(cap->suspended_ccalling_tasks == task); - cap->suspended_ccalling_tasks = task->next; + ASSERT(cap->suspended_ccalls == incall); + cap->suspended_ccalls = incall->next; } - if (task->next) { - task->next->prev = task->prev; + if (incall->next) { + incall->next->prev = incall->prev; } - task->next = task->prev = NULL; + incall->next = incall->prev = NULL; } /* --------------------------------------------------------------------------- @@ -1832,7 +1827,8 @@ suspendThread (StgRegTable *reg) } // Hand back capability - task->suspended_tso = tso; + task->incall->suspended_tso = tso; + task->incall->suspended_cap = cap; ACQUIRE_LOCK(&cap->lock); @@ -1853,6 +1849,7 @@ StgRegTable * resumeThread (void *task_) { StgTSO *tso; + InCall *incall; Capability *cap; Task *task = task_; int saved_errno; @@ -1865,18 +1862,22 @@ resumeThread (void *task_) saved_winerror = GetLastError(); #endif - cap = task->cap; + incall = task->incall; + cap = incall->suspended_cap; + task->cap = cap; + // Wait for permission to re-enter the RTS with the result. waitForReturnCapability(&cap,task); // we might be on a different capability now... but if so, our - // entry on the suspended_ccalling_tasks list will also have been + // entry on the suspended_ccalls list will also have been // migrated. // Remove the thread from the suspended list recoverSuspendedTask(cap,task); - tso = task->suspended_tso; - task->suspended_tso = NULL; + tso = incall->suspended_tso; + incall->suspended_tso = NULL; + incall->suspended_cap = NULL; tso->_link = END_TSO_QUEUE; // no write barrier reqd traceEventRunThread(cap, tso); @@ -1954,10 +1955,10 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) // This TSO is now a bound thread; make the Task and TSO // point to each other. - tso->bound = task; + tso->bound = task->incall; tso->cap = cap; - task->tso = tso; + task->incall->tso = tso; task->ret = ret; task->stat = NoStatus; @@ -1980,23 +1981,8 @@ scheduleWaitThread (StgTSO* tso, /*[out]*/HaskellObj* ret, Capability *cap) * ------------------------------------------------------------------------- */ #if defined(THREADED_RTS) -void OSThreadProcAttr -workerStart(Task *task) +void scheduleWorker (Capability *cap, Task *task) { - Capability *cap; - - // See startWorkerTask(). - ACQUIRE_LOCK(&task->lock); - cap = task->cap; - RELEASE_LOCK(&task->lock); - - if (RtsFlags.ParFlags.setAffinity) { - setThreadAffinity(cap->no, n_capabilities); - } - - // set the thread-local pointer to the Task: - taskEnter(task); - // schedule() runs without a lock. cap = schedule(cap,task); @@ -2062,6 +2048,8 @@ initScheduler(void) initSparkPools(); #endif + RELEASE_LOCK(&sched_mutex); + #if defined(THREADED_RTS) /* * Eagerly start one worker to run each Capability, except for @@ -2075,13 +2063,11 @@ initScheduler(void) for (i = 1; i < n_capabilities; i++) { cap = &capabilities[i]; ACQUIRE_LOCK(&cap->lock); - startWorkerTask(cap, workerStart); + startWorkerTask(cap); RELEASE_LOCK(&cap->lock); } } #endif - - RELEASE_LOCK(&sched_mutex); } void @@ -2102,7 +2088,7 @@ exitScheduler( sched_state = SCHED_INTERRUPTING; waitForReturnCapability(&task->cap,task); scheduleDoGC(task->cap,task,rtsFalse); - ASSERT(task->tso == NULL); + ASSERT(task->incall->tso == NULL); releaseCapability(task->cap); } sched_state = SCHED_SHUTTING_DOWN; @@ -2112,7 +2098,7 @@ exitScheduler( nat i; for (i = 0; i < n_capabilities; i++) { - ASSERT(task->tso == NULL); + ASSERT(task->incall->tso == NULL); shutdownCapability(&capabilities[i], task, wait_foreign); } } @@ -2161,7 +2147,7 @@ performGC_(rtsBool force_major) // We must grab a new Task here, because the existing Task may be // associated with a particular Capability, and chained onto the - // suspended_ccalling_tasks queue. + // suspended_ccalls queue. task = newBoundTask(); waitForReturnCapability(&task->cap,task); @@ -2368,8 +2354,8 @@ threadStackUnderflow (Capability *cap, Task *task, StgTSO *tso) // The TSO attached to this Task may have moved, so update the // pointer to it. - if (task->tso == tso) { - task->tso = new_tso; + if (task->incall->tso == tso) { + task->incall->tso = new_tso; } unlockTSO(new_tso); |